Feature/UI sideload (#2658)

* ui sideloading

* remove subtlecrypto import

* fix parser

* misc fixes

* allow docker pull during compat conversion
This commit is contained in:
Aiden McClelland
2024-06-28 15:03:01 -06:00
committed by GitHub
parent c16d8a1da1
commit 822dd5e100
101 changed files with 1901 additions and 797 deletions

View File

@@ -28,6 +28,7 @@ pub struct AccountInfo {
pub root_ca_key: PKey<Private>,
pub root_ca_cert: X509,
pub ssh_key: ssh_key::PrivateKey,
pub compat_s9pk_key: ed25519_dalek::SigningKey,
}
impl AccountInfo {
pub fn new(password: &str, start_time: SystemTime) -> Result<Self, Error> {
@@ -39,6 +40,7 @@ impl AccountInfo {
let ssh_key = ssh_key::PrivateKey::from(ssh_key::private::Ed25519Keypair::random(
&mut rand::thread_rng(),
));
let compat_s9pk_key = ed25519_dalek::SigningKey::generate(&mut rand::thread_rng());
Ok(Self {
server_id,
hostname,
@@ -47,6 +49,7 @@ impl AccountInfo {
root_ca_key,
root_ca_cert,
ssh_key,
compat_s9pk_key,
})
}
@@ -61,6 +64,7 @@ impl AccountInfo {
let root_ca_key = cert_store.as_root_key().de()?.0;
let root_ca_cert = cert_store.as_root_cert().de()?.0;
let ssh_key = db.as_private().as_ssh_privkey().de()?.0;
let compat_s9pk_key = db.as_private().as_compat_s9pk_key().de()?.0;
Ok(Self {
server_id,
@@ -70,6 +74,7 @@ impl AccountInfo {
root_ca_key,
root_ca_cert,
ssh_key,
compat_s9pk_key,
})
}
@@ -92,6 +97,9 @@ impl AccountInfo {
db.as_private_mut()
.as_ssh_privkey_mut()
.ser(Pem::new_ref(&self.ssh_key))?;
db.as_private_mut()
.as_compat_s9pk_key_mut()
.ser(Pem::new_ref(&self.compat_s9pk_key))?;
let key_store = db.as_private_mut().as_key_store_mut();
key_store.as_onion_mut().insert_key(&self.tor_key)?;
let cert_store = key_store.as_local_certs_mut();

View File

@@ -85,6 +85,7 @@ impl OsBackupV0 {
ssh_key::Algorithm::Ed25519,
)?,
tor_key: TorSecretKeyV3::from(self.tor_key.0),
compat_s9pk_key: ed25519_dalek::SigningKey::generate(&mut rand::thread_rng()),
},
ui: self.ui,
})
@@ -113,6 +114,7 @@ impl OsBackupV1 {
root_ca_cert: self.root_ca_cert.0,
ssh_key: ssh_key::PrivateKey::from(Ed25519Keypair::from_seed(&self.net_key.0)),
tor_key: TorSecretKeyV3::from(ed25519_expand_key(&self.net_key.0)),
compat_s9pk_key: ed25519_dalek::SigningKey::from_bytes(&self.net_key),
},
ui: self.ui,
}
@@ -124,13 +126,14 @@ impl OsBackupV1 {
#[serde(rename = "kebab-case")]
struct OsBackupV2 {
server_id: String, // uuidv4
hostname: String, // <adjective>-<noun>
root_ca_key: Pem<PKey<Private>>, // PEM Encoded OpenSSL Key
root_ca_cert: Pem<X509>, // PEM Encoded OpenSSL X509 Certificate
ssh_key: Pem<ssh_key::PrivateKey>, // PEM Encoded OpenSSH Key
tor_key: TorSecretKeyV3, // Base64 Encoded Ed25519 Expanded Secret Key
ui: Value, // JSON Value
server_id: String, // uuidv4
hostname: String, // <adjective>-<noun>
root_ca_key: Pem<PKey<Private>>, // PEM Encoded OpenSSL Key
root_ca_cert: Pem<X509>, // PEM Encoded OpenSSL X509 Certificate
ssh_key: Pem<ssh_key::PrivateKey>, // PEM Encoded OpenSSH Key
tor_key: TorSecretKeyV3, // Base64 Encoded Ed25519 Expanded Secret Key
compat_s9pk_key: Pem<ed25519_dalek::SigningKey>, // PEM Encoded ED25519 Key
ui: Value, // JSON Value
}
impl OsBackupV2 {
fn project(self) -> OsBackup {
@@ -143,6 +146,7 @@ impl OsBackupV2 {
root_ca_cert: self.root_ca_cert.0,
ssh_key: self.ssh_key.0,
tor_key: self.tor_key,
compat_s9pk_key: self.compat_s9pk_key.0,
},
ui: self.ui,
}
@@ -155,6 +159,7 @@ impl OsBackupV2 {
root_ca_cert: Pem(backup.account.root_ca_cert.clone()),
ssh_key: Pem(backup.account.ssh_key.clone()),
tor_key: backup.account.tor_key.clone(),
compat_s9pk_key: Pem(backup.account.compat_s9pk_key.clone()),
ui: backup.ui.clone(),
}
}

View File

@@ -156,16 +156,14 @@ async fn restore_packages(
let mut tasks = BTreeMap::new();
for id in ids {
let backup_dir = backup_guard.clone().package_backup(&id);
let s9pk_path = backup_dir.path().join(&id).with_extension("s9pk");
let task = ctx
.services
.install(
ctx.clone(),
S9pk::open(
backup_dir.path().join(&id).with_extension("s9pk"),
Some(&id),
)
.await?,
|| S9pk::open(s9pk_path, Some(&id)),
Some(backup_dir),
None,
)
.await?;
tasks.insert(id, task);

View File

@@ -7,6 +7,7 @@ use clap::Parser;
use color_eyre::eyre::eyre;
use digest::generic_array::GenericArray;
use digest::OutputSizeUser;
use exver::Version;
use models::PackageId;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
@@ -194,7 +195,7 @@ pub async fn list(ctx: RpcContext) -> Result<BTreeMap<BackupTargetId, BackupTarg
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BackupInfo {
pub version: VersionString,
pub version: Version,
pub timestamp: Option<DateTime<Utc>>,
pub package_backups: BTreeMap<PackageId, PackageBackupInfo>,
}
@@ -204,7 +205,7 @@ pub struct BackupInfo {
pub struct PackageBackupInfo {
pub title: String,
pub version: VersionString,
pub os_version: VersionString,
pub os_version: Version,
pub timestamp: DateTime<Utc>,
}
@@ -223,9 +224,9 @@ fn display_backup_info(params: WithIoFormat<InfoParams>, info: BackupInfo) {
"TIMESTAMP",
]);
table.add_row(row![
"EMBASSY OS",
info.version.as_str(),
info.version.as_str(),
"StartOS",
&info.version.to_string(),
&info.version.to_string(),
&if let Some(ts) = &info.timestamp {
ts.to_string()
} else {
@@ -236,7 +237,7 @@ fn display_backup_info(params: WithIoFormat<InfoParams>, info: BackupInfo) {
let row = row![
&*id,
info.version.as_str(),
info.os_version.as_str(),
&info.os_version.to_string(),
&info.timestamp.to_string(),
];
table.add_row(row);

View File

@@ -43,7 +43,9 @@ impl Drop for CliContextSeed {
std::fs::create_dir_all(&parent_dir).unwrap();
}
let mut writer = fd_lock_rs::FdLock::lock(
File::create(&tmp).unwrap(),
File::create(&tmp)
.with_ctx(|_| (ErrorKind::Filesystem, &tmp))
.unwrap(),
fd_lock_rs::LockType::Exclusive,
true,
)
@@ -80,9 +82,12 @@ impl CliContext {
});
let cookie_store = Arc::new(CookieStoreMutex::new({
let mut store = if cookie_path.exists() {
CookieStore::load_json(BufReader::new(File::open(&cookie_path)?))
.map_err(|e| eyre!("{}", e))
.with_kind(crate::ErrorKind::Deserialization)?
CookieStore::load_json(BufReader::new(
File::open(&cookie_path)
.with_ctx(|_| (ErrorKind::Filesystem, cookie_path.display()))?,
))
.map_err(|e| eyre!("{}", e))
.with_kind(crate::ErrorKind::Deserialization)?
} else {
CookieStore::default()
};

View File

@@ -37,7 +37,10 @@ pub trait ContextConfig: DeserializeOwned + Default {
.map(|f| f.parse())
.transpose()?
.unwrap_or_default();
format.from_reader(File::open(path)?)
format.from_reader(
File::open(path.as_ref())
.with_ctx(|_| (ErrorKind::Filesystem, path.as_ref().display()))?,
)
}
fn load_path_rec(&mut self, path: Option<impl AsRef<Path>>) -> Result<(), Error> {
if let Some(path) = path.filter(|p| p.as_ref().exists()) {

View File

@@ -14,7 +14,6 @@ use rpc_toolkit::{CallRemote, Context, Empty};
use tokio::sync::{broadcast, Mutex, RwLock};
use tokio::time::Instant;
use tracing::instrument;
use url::Url;
use super::setup::CURRENT_SECRET;
use crate::account::AccountInfo;

View File

@@ -40,6 +40,7 @@ impl Database {
notifications: Notifications::new(),
cifs: CifsTargets::new(),
package_stores: BTreeMap::new(),
compat_s9pk_key: Pem(account.compat_s9pk_key.clone()),
}, // TODO
})
}

View File

@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use chrono::{DateTime, Utc};
use emver::VersionRange;
use exver::VersionRange;
use imbl_value::InternedString;
use models::{ActionId, DataUrl, HealthCheckId, HostId, PackageId, ServiceInterfaceId};
use patch_db::json_ptr::JsonPointer;

View File

@@ -19,6 +19,8 @@ use crate::util::serde::Pem;
pub struct Private {
pub key_store: KeyStore,
pub password: String, // argon2 hash
#[serde(default = "generate_compat_key")]
pub compat_s9pk_key: Pem<ed25519_dalek::SigningKey>,
pub ssh_privkey: Pem<ssh_key::PrivateKey>,
pub ssh_pubkeys: SshKeys,
pub available_ports: AvailablePorts,
@@ -28,3 +30,7 @@ pub struct Private {
#[serde(default)]
pub package_stores: BTreeMap<PackageId, Value>,
}
fn generate_compat_key() -> Pem<ed25519_dalek::SigningKey> {
Pem(ed25519_dalek::SigningKey::generate(&mut rand::thread_rng()))
}

View File

@@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::net::{Ipv4Addr, Ipv6Addr};
use chrono::{DateTime, Utc};
use emver::VersionRange;
use exver::{Version, VersionRange};
use imbl_value::InternedString;
use ipnet::{Ipv4Net, Ipv6Net};
use isocountry::CountryCode;
@@ -21,7 +21,6 @@ use crate::net::utils::{get_iface_ipv4_addr, get_iface_ipv6_addr};
use crate::prelude::*;
use crate::progress::FullProgress;
use crate::util::cpupower::Governor;
use crate::util::VersionString;
use crate::version::{Current, VersionT};
use crate::{ARCH, PLATFORM};
@@ -43,7 +42,7 @@ impl Public {
arch: get_arch(),
platform: get_platform(),
id: account.server_id.clone(),
version: Current::new().semver().into(),
version: Current::new().semver(),
hostname: account.hostname.no_dot_host_name(),
last_backup: None,
eos_version_compat: Current::new().compat().clone(),
@@ -109,7 +108,8 @@ pub struct ServerInfo {
pub platform: InternedString,
pub id: String,
pub hostname: String,
pub version: VersionString,
#[ts(type = "string")]
pub version: Version,
#[ts(type = "string | null")]
pub last_backup: Option<DateTime<Utc>>,
#[ts(type = "string")]

View File

@@ -8,8 +8,8 @@ use ed25519_dalek::{SigningKey, VerifyingKey};
use tracing::instrument;
use crate::context::CliContext;
use crate::prelude::*;
use crate::util::serde::Pem;
use crate::{Error, ResultExt};
#[instrument(skip_all)]
pub fn init(ctx: CliContext) -> Result<(), Error> {
@@ -26,7 +26,8 @@ pub fn init(ctx: CliContext) -> Result<(), Error> {
secret_key: secret.to_bytes(),
public_key: Some(PublicKeyBytes(VerifyingKey::from(&secret).to_bytes())),
};
let mut dev_key_file = File::create(&ctx.developer_key_path)?;
let mut dev_key_file = File::create(&ctx.developer_key_path)
.with_ctx(|_| (ErrorKind::Filesystem, ctx.developer_key_path.display()))?;
dev_key_file.write_all(
keypair_bytes
.to_pkcs8_pem(base64ct::LineEnding::default())

View File

@@ -102,10 +102,10 @@ fn display_disk_info(params: WithIoFormat<Empty>, args: Vec<DiskInfo>) {
} else {
"N/A"
},
if let Some(eos) = part.start_os.as_ref() {
eos.version.as_str()
&if let Some(eos) = part.start_os.as_ref() {
eos.version.to_string()
} else {
"N/A"
"N/A".to_owned()
},
];
table.add_row(row);

View File

@@ -20,7 +20,7 @@ use super::mount::guard::TmpMountGuard;
use crate::disk::mount::guard::GenericMountGuard;
use crate::disk::OsPartitionInfo;
use crate::util::serde::IoFormat;
use crate::util::{Invoke, VersionString};
use crate::util::Invoke;
use crate::{Error, ResultExt as _};
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
@@ -56,7 +56,7 @@ pub struct PartitionInfo {
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EmbassyOsRecoveryInfo {
pub version: VersionString,
pub version: exver::Version,
pub full: bool,
pub password_hash: Option<String>,
pub wrapped_key: Option<String>,

View File

@@ -3,13 +3,12 @@ use std::path::Path;
use async_compression::tokio::bufread::GzipDecoder;
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::BufReader;
use tokio::process::Command;
use crate::disk::fsck::RequiresReboot;
use crate::prelude::*;
use crate::progress::PhaseProgressTrackerHandle;
use crate::util::io::open_file;
use crate::util::Invoke;
use crate::PLATFORM;
@@ -134,7 +133,7 @@ pub async fn update_firmware(firmware: Firmware) -> Result<(), Error> {
.invoke(ErrorKind::Filesystem)
.await?;
let mut rdr = if tokio::fs::metadata(&firmware_path).await.is_ok() {
GzipDecoder::new(BufReader::new(File::open(&firmware_path).await?))
GzipDecoder::new(BufReader::new(open_file(&firmware_path).await?))
} else {
return Err(Error::new(
eyre!("Firmware {id}.rom.gz not found in {firmware_dir:?}"),

View File

@@ -5,7 +5,7 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::time::{Duration, SystemTime};
use axum::extract::ws::{self, CloseFrame};
use axum::extract::ws::{self};
use color_eyre::eyre::eyre;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
@@ -31,7 +31,7 @@ use crate::progress::{
};
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::ssh::SSH_AUTHORIZED_KEYS_FILE;
use crate::util::io::IOHook;
use crate::util::io::{create_file, IOHook};
use crate::util::net::WebSocketExt;
use crate::util::{cpupower, Invoke};
use crate::Error;
@@ -138,10 +138,7 @@ pub async fn init_postgres(datadir: impl AsRef<Path>) -> Result<(), Error> {
old_version -= 1;
let old_datadir = db_dir.join(old_version.to_string());
if tokio::fs::metadata(&old_datadir).await.is_ok() {
tokio::fs::File::create(&incomplete_path)
.await?
.sync_all()
.await?;
create_file(&incomplete_path).await?.sync_all().await?;
Command::new("pg_upgradecluster")
.arg(old_version.to_string())
.arg("main")

View File

@@ -1,21 +1,21 @@
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use clap::builder::ValueParserFactory;
use clap::{value_parser, CommandFactory, FromArgMatches, Parser};
use color_eyre::eyre::eyre;
use emver::VersionRange;
use futures::StreamExt;
use imbl_value::InternedString;
use exver::VersionRange;
use futures::{AsyncWriteExt, StreamExt};
use imbl_value::{json, InternedString};
use itertools::Itertools;
use patch_db::json_ptr::JsonPointer;
use models::VersionString;
use reqwest::header::{HeaderMap, CONTENT_LENGTH};
use reqwest::Url;
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::HandlerArgs;
use rustyline_async::ReadlineEvent;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::oneshot;
use tracing::instrument;
use ts_rs::TS;
@@ -23,13 +23,14 @@ use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::db::model::package::{ManifestPreference, PackageState, PackageStateMatchModelRef};
use crate::prelude::*;
use crate::progress::{FullProgress, PhasedProgressBar};
use crate::progress::{FullProgress, FullProgressTracker, PhasedProgressBar};
use crate::registry::context::{RegistryContext, RegistryUrlParams};
use crate::registry::package::get::GetPackageResponse;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::manifest::PackageId;
use crate::s9pk::merkle_archive::source::http::HttpSource;
use crate::s9pk::S9pk;
use crate::upload::upload;
use crate::util::clap::FromStrParser;
use crate::util::io::open_file;
use crate::util::net::WebSocketExt;
use crate::util::Never;
@@ -38,32 +39,33 @@ pub const PKG_PUBLIC_DIR: &str = "package-data/public";
pub const PKG_WASM_DIR: &str = "package-data/wasm";
// #[command(display(display_serializable))]
pub async fn list(ctx: RpcContext) -> Result<Value, Error> {
Ok(ctx.db.peek().await.as_public().as_package_data().as_entries()?
pub async fn list(ctx: RpcContext) -> Result<Vec<Value>, Error> {
Ok(ctx
.db
.peek()
.await
.as_public()
.as_package_data()
.as_entries()?
.iter()
.filter_map(|(id, pde)| {
let status = match pde.as_state_info().as_match() {
PackageStateMatchModelRef::Installed(_) => {
"installed"
}
PackageStateMatchModelRef::Installing(_) => {
"installing"
}
PackageStateMatchModelRef::Updating(_) => {
"updating"
}
PackageStateMatchModelRef::Restoring(_) => {
"restoring"
}
PackageStateMatchModelRef::Removing(_) => {
"removing"
}
PackageStateMatchModelRef::Error(_) => {
"error"
}
PackageStateMatchModelRef::Installed(_) => "installed",
PackageStateMatchModelRef::Installing(_) => "installing",
PackageStateMatchModelRef::Updating(_) => "updating",
PackageStateMatchModelRef::Restoring(_) => "restoring",
PackageStateMatchModelRef::Removing(_) => "removing",
PackageStateMatchModelRef::Error(_) => "error",
};
serde_json::to_value(json!({ "status": status, "id": id.clone(), "version": pde.as_state_info().as_manifest(ManifestPreference::Old).as_version().de().ok()?}))
.ok()
Some(json!({
"status": status,
"id": id.clone(),
"version": pde.as_state_info()
.as_manifest(ManifestPreference::Old)
.as_version()
.de()
.ok()?
}))
})
.collect())
}
@@ -107,65 +109,57 @@ impl std::fmt::Display for MinMax {
}
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[derive(Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct InstallParams {
#[ts(type = "string")]
registry: Url,
id: PackageId,
#[arg(short = 'm', long = "marketplace-url")]
#[ts(type = "string | null")]
registry: Option<Url>,
#[arg(short = 'v', long = "version-spec")]
version_spec: Option<String>,
#[arg(long = "version-priority")]
version_priority: Option<MinMax>,
version: VersionString,
}
// #[command(
// custom_cli(cli_install(async, context(CliContext))),
// )]
#[instrument(skip_all)]
pub async fn install(
ctx: RpcContext,
InstallParams {
id,
registry,
version_spec,
version_priority,
id,
version,
}: InstallParams,
) -> Result<(), Error> {
let version_str = match &version_spec {
None => "*",
Some(v) => &*v,
};
let version: VersionRange = version_str.parse()?;
let registry = registry.unwrap_or_else(|| crate::DEFAULT_MARKETPLACE.parse().unwrap());
let version_priority = version_priority.unwrap_or_default();
let s9pk = S9pk::deserialize(
&Arc::new(
HttpSource::new(
ctx.client.clone(),
format!(
"{}/package/v0/{}.s9pk?spec={}&version-priority={}",
registry, id, version, version_priority,
)
.parse()?,
)
.await?,
),
None, // TODO
)
.await?;
let package: GetPackageResponse = from_value(
ctx.call_remote_with::<RegistryContext, _>(
"package.get",
json!({
"id": id,
"version": VersionRange::exactly(version.deref().clone()),
}),
RegistryUrlParams {
registry: registry.clone(),
},
)
.await?,
)?;
ensure_code!(
&s9pk.as_manifest().id == &id,
ErrorKind::ValidateS9pk,
"manifest.id does not match expected"
);
let asset = &package
.best
.get(&version)
.ok_or_else(|| {
Error::new(
eyre!("{id}@{version} not found on {registry}"),
ErrorKind::NotFound,
)
})?
.s9pk;
let download = ctx
.services
.install(ctx.clone(), s9pk, None::<Never>)
.install(
ctx.clone(),
|| asset.deserialize_s9pk(ctx.client.clone()),
None::<Never>,
None,
)
.await?;
tokio::spawn(async move { download.await?.await });
@@ -193,113 +187,74 @@ pub async fn sideload(
SideloadParams { session }: SideloadParams,
) -> Result<SideloadResponse, Error> {
let (upload, file) = upload(&ctx, session.clone()).await?;
let (id_send, id_recv) = oneshot::channel();
let (err_send, err_recv) = oneshot::channel();
let progress = Guid::new();
let db = ctx.db.clone();
let mut sub = db
.subscribe(
"/package-data/{id}/install-progress"
.parse::<JsonPointer>()
.with_kind(ErrorKind::Database)?,
)
.await;
ctx.rpc_continuations.add(
progress.clone(),
RpcContinuation::ws_authed(&ctx, session,
|mut ws| {
use axum::extract::ws::Message;
async move {
if let Err(e) = async {
let id = match id_recv.await.map_err(|_| {
Error::new(
eyre!("Could not get id to watch progress"),
ErrorKind::Cancelled,
)
}).and_then(|a|a) {
Ok(a) => a,
Err(e) =>{ ws.send(Message::Text(
serde_json::to_string(&Err::<(), _>(RpcError::from(e.clone_output())))
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
return Err(e);
}
};
tokio::select! {
res = async {
while let Some(_) = sub.recv().await {
ws.send(Message::Text(
serde_json::to_string(&if let Some(p) = db
.peek()
.await
.as_public()
.as_package_data()
.as_idx(&id)
.and_then(|e| e.as_state_info().as_installing_info()).map(|i| i.as_progress())
{
Ok::<_, ()>(p.de()?)
} else {
let mut p = FullProgress::new();
p.overall.complete();
Ok(p)
})
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
}
Ok::<_, Error>(())
} => res?,
err = err_recv => {
if let Ok(e) = err {
ws.send(Message::Text(
serde_json::to_string(&Err::<(), _>(e))
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
let progress_tracker = FullProgressTracker::new();
let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200)));
ctx.rpc_continuations
.add(
progress.clone(),
RpcContinuation::ws_authed(
&ctx,
session,
|mut ws| {
use axum::extract::ws::Message;
async move {
if let Err(e) = async {
tokio::select! {
res = async {
while let Some(progress) = progress_listener.next().await {
ws.send(Message::Text(
serde_json::to_string(&Ok::<_, ()>(progress))
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
}
Ok::<_, Error>(())
} => res?,
err = err_recv => {
if let Ok(e) = err {
ws.send(Message::Text(
serde_json::to_string(&Err::<(), _>(e))
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
}
}
}
ws.normal_close("complete").await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
}
ws.normal_close("complete").await?;
Ok::<_, Error>(())
}
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
}
}
},
Duration::from_secs(600),
),
)
.await;
},
Duration::from_secs(600),
),
)
.await;
tokio::spawn(async move {
if let Err(e) = async {
match S9pk::deserialize(
&file, None, // TODO
)
.await
{
Ok(s9pk) => {
let _ = id_send.send(Ok(s9pk.as_manifest().id.clone()));
ctx.services
.install(ctx.clone(), s9pk, None::<Never>)
.await?
.await?
.await?;
file.delete().await
}
Err(e) => {
let _ = id_send.send(Err(e.clone_output()));
return Err(e);
}
}
let key = ctx.db.peek().await.into_private().into_compat_s9pk_key();
ctx.services
.install(
ctx.clone(),
|| crate::s9pk::load(file.clone(), || Ok(key.de()?.0), Some(&progress_tracker)),
None::<Never>,
Some(progress_tracker.clone()),
)
.await?
.await?
.await?;
file.delete().await
}
.await
{
@@ -311,10 +266,16 @@ pub async fn sideload(
Ok(SideloadResponse { upload, progress })
}
#[derive(Deserialize, Serialize, Parser)]
pub struct QueryPackageParams {
id: PackageId,
version: Option<VersionRange>,
}
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum CliInstallParams {
Marketplace(InstallParams),
Marketplace(QueryPackageParams),
Sideload(PathBuf),
}
impl CommandFactory for CliInstallParams {
@@ -328,14 +289,19 @@ impl CommandFactory for CliInstallParams {
.required_unless_present("id")
.value_parser(value_parser!(PathBuf)),
)
.args(InstallParams::command().get_arguments().cloned().map(|a| {
if a.get_id() == "id" {
a.required(false).required_unless_present("sideload")
} else {
a
}
.conflicts_with("sideload")
}))
.args(
QueryPackageParams::command()
.get_arguments()
.cloned()
.map(|a| {
if a.get_id() == "id" {
a.required(false).required_unless_present("sideload")
} else {
a
}
.conflicts_with("sideload")
}),
)
}
fn command_for_update() -> clap::Command {
Self::command()
@@ -346,7 +312,9 @@ impl FromArgMatches for CliInstallParams {
if let Some(sideload) = matches.get_one::<PathBuf>("sideload") {
Ok(Self::Sideload(sideload.clone()))
} else {
Ok(Self::Marketplace(InstallParams::from_arg_matches(matches)?))
Ok(Self::Marketplace(QueryPackageParams::from_arg_matches(
matches,
)?))
}
}
fn update_from_arg_matches(&mut self, matches: &clap::ArgMatches) -> Result<(), clap::Error> {
@@ -355,6 +323,35 @@ impl FromArgMatches for CliInstallParams {
}
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[ts(export)]
pub struct InstalledVersionParams {
id: PackageId,
}
pub async fn installed_version(
ctx: RpcContext,
InstalledVersionParams { id }: InstalledVersionParams,
) -> Result<Option<VersionString>, Error> {
if let Some(pde) = ctx
.db
.peek()
.await
.into_public()
.into_package_data()
.into_idx(&id)
{
Ok(Some(
pde.into_state_info()
.as_manifest(ManifestPreference::Old)
.as_version()
.de()?,
))
} else {
Ok(None)
}
}
#[instrument(skip_all)]
pub async fn cli_install(
HandlerArgs {
@@ -368,7 +365,7 @@ pub async fn cli_install(
let method = parent_method.into_iter().chain(method).collect_vec();
match params {
CliInstallParams::Sideload(path) => {
let file = crate::s9pk::load(&ctx, path).await?;
let file = open_file(path).await?;
// rpc call remote sideload
let SideloadResponse { upload, progress } = from_value::<SideloadResponse>(
@@ -435,9 +432,70 @@ pub async fn cli_install(
progress?;
upload?;
}
CliInstallParams::Marketplace(params) => {
ctx.call_remote::<RpcContext>(&method.join("."), to_value(&params)?)
.await?;
CliInstallParams::Marketplace(QueryPackageParams { id, version }) => {
let source_version: Option<VersionString> = from_value(
ctx.call_remote::<RpcContext>("package.installed-version", json!({ "id": &id }))
.await?,
)?;
let mut packages: GetPackageResponse = from_value(
ctx.call_remote::<RegistryContext>(
"package.get",
json!({ "id": &id, "version": version, "sourceVersion": source_version }),
)
.await?,
)?;
let version = if packages.best.len() == 1 {
packages.best.pop_first().map(|(k, _)| k).unwrap()
} else {
println!("Multiple flavors of {id} found. Please select one of the following versions to install:");
let version;
loop {
let (mut read, mut output) = rustyline_async::Readline::new("> ".into())
.with_kind(ErrorKind::Filesystem)?;
for (idx, version) in packages.best.keys().enumerate() {
output
.write_all(format!(" {}) {}\n", idx + 1, version).as_bytes())
.await?;
read.add_history_entry(version.to_string());
}
if let ReadlineEvent::Line(line) = read.readline().await? {
let trimmed = line.trim();
match trimmed.parse() {
Ok(v) => {
if let Some((k, _)) = packages.best.remove_entry(&v) {
version = k;
break;
}
}
Err(_) => match trimmed.parse::<usize>() {
Ok(i) if (1..=packages.best.len()).contains(&i) => {
version = packages.best.keys().nth(i - 1).unwrap().clone();
break;
}
_ => (),
},
}
eprintln!("invalid selection: {trimmed}");
println!("Please select one of the following versions to install:");
} else {
return Err(Error::new(
eyre!("Could not determine precise version to install"),
ErrorKind::InvalidRequest,
)
.into());
}
}
version
};
ctx.call_remote::<RpcContext>(
&method.join("."),
to_value(&InstallParams {
id,
registry: ctx.registry_url.clone().or_not_found("--registry")?,
version,
})?,
)
.await?;
}
}
Ok(())

View File

@@ -1,4 +1,4 @@
pub const DEFAULT_MARKETPLACE: &str = "https://registry.start9.com";
pub const DEFAULT_REGISTRY: &str = "https://registry.start9.com";
// pub const COMMUNITY_MARKETPLACE: &str = "https://community-registry.start9.com";
pub const HOST_IP: [u8; 4] = [172, 18, 0, 1];
pub use std::env::consts::ARCH;
@@ -263,6 +263,12 @@ pub fn package<C: Context>() -> ParentHandler<C> {
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
.subcommand(
"installed-version",
from_fn_async(install::installed_version)
.with_display_serializable()
.with_call_remote::<CliContext>(),
)
.subcommand("config", config::config::<C>())
.subcommand(
"start",

View File

@@ -12,7 +12,6 @@ use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{GenericRpcMethod, RpcRequest, RpcResponse};
use rustyline_async::{ReadlineEvent, SharedWriter};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex;
@@ -30,6 +29,7 @@ use crate::disk::mount::util::unmount;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::util::clap::FromStrParser;
use crate::util::io::open_file;
use crate::util::rpc_client::UnixRpcClient;
use crate::util::{new_guid, Invoke};
@@ -342,7 +342,7 @@ impl Drop for LxcContainer {
if let Err(e) = async {
let err_path = rootfs.path().join("var/log/containerRuntime.err");
if tokio::fs::metadata(&err_path).await.is_ok() {
let mut lines = BufReader::new(File::open(&err_path).await?).lines();
let mut lines = BufReader::new(open_file(&err_path).await?).lines();
while let Some(line) = lines.next_line().await? {
let container = &**guid;
tracing::error!(container, "{}", line);

View File

@@ -5,8 +5,6 @@ use models::{HostId, ServiceInterfaceId};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use crate::net::host::address::HostAddress;
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[ts(export)]
#[serde(rename_all = "camelCase")]

View File

@@ -19,7 +19,6 @@ use new_mime_guess::MimeGuess;
use openssl::hash::MessageDigest;
use openssl::x509::X509;
use rpc_toolkit::{Context, HttpServer, Server};
use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::io::ReaderStream;
@@ -29,6 +28,7 @@ use crate::middleware::auth::{Auth, HasValidSession};
use crate::middleware::cors::Cors;
use crate::middleware::db::SyncDb;
use crate::rpc_continuations::{Guid, RpcContinuations};
use crate::util::io::open_file;
use crate::{
diagnostic_api, init_api, install_api, main_api, setup_api, Error, ErrorKind, ResultExt,
};
@@ -44,8 +44,6 @@ const EMBEDDED_UIS: Dir<'_> =
#[cfg(not(all(feature = "daemon", not(feature = "test"))))]
const EMBEDDED_UIS: Dir<'_> = Dir::new("", &[]);
const PROXY_STRIP_HEADERS: &[&str] = &["cookie", "host", "origin", "referer", "user-agent"];
#[derive(Clone)]
pub enum UiMode {
Setup,
@@ -340,9 +338,8 @@ impl FileData {
.any(|e| e == "gzip")
.then_some("gzip");
let file = File::open(path)
.await
.with_ctx(|_| (ErrorKind::Filesystem, path.display().to_string()))?;
let file = open_file(path)
.await?;
let metadata = file
.metadata()
.await

View File

@@ -74,6 +74,7 @@ pub async fn list(
.as_notifications()
.as_entries()?
.into_iter()
.rev()
.take(limit);
let notifs = records
.into_iter()
@@ -97,6 +98,7 @@ pub async fn list(
.as_entries()?
.into_iter()
.filter(|(id, _)| *id < before)
.rev()
.take(limit);
records
.into_iter()

View File

@@ -21,7 +21,7 @@ use crate::disk::OsPartitionInfo;
use crate::net::utils::find_eth_iface;
use crate::prelude::*;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::util::io::TmpDir;
use crate::util::io::{open_file, TmpDir};
use crate::util::serde::IoFormat;
use crate::util::Invoke;
use crate::ARCH;
@@ -241,12 +241,10 @@ pub async fn execute<C: Context>(
tokio::fs::create_dir_all(&images_path).await?;
let image_path = images_path
.join(hex::encode(
&MultiCursorFile::from(
tokio::fs::File::open("/run/live/medium/live/filesystem.squashfs").await?,
)
.blake3_mmap()
.await?
.as_bytes()[..16],
&MultiCursorFile::from(open_file("/run/live/medium/live/filesystem.squashfs").await?)
.blake3_mmap()
.await?
.as_bytes()[..16],
))
.with_extension("rootfs");
tokio::fs::copy("/run/live/medium/live/filesystem.squashfs", &image_path).await?;

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use reqwest::Client;
use serde::{Deserialize, Serialize};
@@ -7,10 +8,13 @@ use ts_rs::TS;
use url::Url;
use crate::prelude::*;
use crate::registry::signer::commitment::merkle_archive::MerkleArchiveCommitment;
use crate::registry::signer::commitment::{Commitment, Digestable};
use crate::registry::signer::sign::{AnySignature, AnyVerifyingKey};
use crate::registry::signer::AcceptSigners;
use crate::s9pk::merkle_archive::source::http::HttpSource;
use crate::s9pk::merkle_archive::source::Section;
use crate::s9pk::S9pk;
#[derive(Debug, Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
@@ -52,3 +56,15 @@ impl<C: for<'a> Commitment<&'a HttpSource>> RegistryAsset<C> {
.await
}
}
impl RegistryAsset<MerkleArchiveCommitment> {
pub async fn deserialize_s9pk(
&self,
client: Client,
) -> Result<S9pk<Section<Arc<HttpSource>>>, Error> {
S9pk::deserialize(
&Arc::new(HttpSource::new(client, self.url.clone()).await?),
Some(&self.commitment),
)
.await
}
}

View File

@@ -4,7 +4,7 @@ use std::ops::Deref;
use axum::extract::Request;
use axum::response::Response;
use emver::{Version, VersionRange};
use exver::{Version, VersionRange};
use http::HeaderValue;
use imbl_value::InternedString;
use rpc_toolkit::{Middleware, RpcRequest, RpcResponse};

View File

@@ -1,5 +1,4 @@
use std::collections::{BTreeMap, BTreeSet};
use std::net::SocketAddr;
use axum::Router;
use futures::future::ready;

View File

@@ -3,7 +3,6 @@ use std::panic::UnwindSafe;
use std::path::PathBuf;
use clap::Parser;
use helpers::NonDetachingJoinHandle;
use imbl_value::InternedString;
use itertools::Itertools;
use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler};
@@ -13,7 +12,7 @@ use url::Url;
use crate::context::CliContext;
use crate::prelude::*;
use crate::progress::{FullProgressTracker, PhasedProgressBar};
use crate::progress::{FullProgressTracker};
use crate::registry::asset::RegistryAsset;
use crate::registry::context::RegistryContext;
use crate::registry::os::index::OsVersionInfo;
@@ -25,6 +24,7 @@ use crate::s9pk::merkle_archive::hash::VerifyingWriter;
use crate::s9pk::merkle_archive::source::http::HttpSource;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::ArchiveSource;
use crate::util::io::open_file;
use crate::util::serde::Base64;
use crate::util::VersionString;
@@ -184,7 +184,7 @@ pub async fn cli_add_asset(
}
};
let file = MultiCursorFile::from(tokio::fs::File::open(&path).await?);
let file = MultiCursorFile::from(open_file(&path).await?);
let progress = FullProgressTracker::new();
let mut sign_phase = progress.add_phase(InternedString::intern("Signing File"), Some(10));

View File

@@ -20,6 +20,7 @@ use crate::registry::os::SIG_CONTEXT;
use crate::registry::signer::commitment::blake3::Blake3Commitment;
use crate::registry::signer::commitment::Commitment;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::util::io::open_file;
use crate::util::VersionString;
pub fn get_api<C: Context>() -> ParentHandler<C> {
@@ -158,9 +159,7 @@ async fn cli_get_os_asset(
if let Some(mut reverify_phase) = reverify_phase {
reverify_phase.start();
res.commitment
.check(&MultiCursorFile::from(
tokio::fs::File::open(download).await?,
))
.check(&MultiCursorFile::from(open_file(download).await?))
.await?;
reverify_phase.complete();
}

View File

@@ -21,6 +21,7 @@ use crate::registry::signer::sign::ed25519::Ed25519;
use crate::registry::signer::sign::{AnySignature, AnyVerifyingKey, SignatureScheme};
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::ArchiveSource;
use crate::util::io::open_file;
use crate::util::serde::Base64;
use crate::util::VersionString;
@@ -166,7 +167,7 @@ pub async fn cli_sign_asset(
}
};
let file = MultiCursorFile::from(tokio::fs::File::open(&path).await?);
let file = MultiCursorFile::from(open_file(&path).await?);
let progress = FullProgressTracker::new();
let mut sign_phase = progress.add_phase(InternedString::intern("Signing File"), Some(10));

View File

@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use emver::VersionRange;
use exver::VersionRange;
use imbl_value::InternedString;
use serde::{Deserialize, Serialize};
use ts_rs::TS;

View File

@@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use chrono::Utc;
use clap::Parser;
use emver::VersionRange;
use exver::VersionRange;
use itertools::Itertools;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
@@ -148,10 +148,11 @@ pub async fn get_version(
if let (Some(pool), Some(server_id), Some(arch)) = (&ctx.pool, server_id, arch) {
let created_at = Utc::now();
query!("INSERT INTO user_activity (created_at, server_id, arch) VALUES ($1, $2, $3)",
created_at,
server_id,
arch
query!(
"INSERT INTO user_activity (created_at, server_id, arch) VALUES ($1, $2, $3)",
created_at,
server_id,
arch
)
.execute(pool)
.await?;

View File

@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use clap::{Parser, ValueEnum};
use emver::{Version, VersionRange};
use exver::{ExtendedVersion, VersionRange};
use imbl_value::InternedString;
use itertools::Itertools;
use models::PackageId;
@@ -45,8 +45,7 @@ pub struct GetPackageParams {
pub id: Option<PackageId>,
#[ts(type = "string | null")]
pub version: Option<VersionRange>,
#[ts(type = "string | null")]
pub source_version: Option<Version>,
pub source_version: Option<VersionString>,
#[ts(skip)]
#[arg(skip)]
#[serde(rename = "__device_info")]
@@ -132,7 +131,7 @@ fn get_matching_models<'a>(
device_info,
..
}: &GetPackageParams,
) -> Result<Vec<(PackageId, Version, &'a Model<PackageVersionInfo>)>, Error> {
) -> Result<Vec<(PackageId, ExtendedVersion, &'a Model<PackageVersionInfo>)>, Error> {
if let Some(id) = id {
if let Some(pkg) = db.as_packages().as_idx(id) {
vec![(id.clone(), pkg)]
@@ -166,7 +165,7 @@ fn get_matching_models<'a>(
.as_ref()
.map_or(Ok(true), |device_info| info.works_for_device(device_info))?
{
Some((k.clone(), Version::from(v), info))
Some((k.clone(), ExtendedVersion::from(v), info))
} else {
None
},

View File

@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use emver::{Version, VersionRange};
use exver::{Version, VersionRange};
use imbl_value::InternedString;
use models::{DataUrl, PackageId, VersionString};
use serde::{Deserialize, Serialize};
@@ -70,7 +70,8 @@ pub struct PackageVersionInfo {
pub support_site: Url,
#[ts(type = "string")]
pub marketing_site: Url,
pub os_version: VersionString,
#[ts(type = "string")]
pub os_version: Version,
pub hardware_requirements: HardwareRequirements,
#[ts(type = "string | null")]
pub source_version: Option<VersionRange>,

View File

@@ -4,7 +4,6 @@ use std::str::FromStr;
use ::ed25519::pkcs8::BitStringRef;
use clap::builder::ValueParserFactory;
use der::referenced::OwnedToRef;
use der::{Decode, Encode};
use pkcs8::der::AnyRef;
use pkcs8::{PrivateKeyInfo, SubjectPublicKeyInfo};
use serde::{Deserialize, Serialize};

View File

@@ -274,6 +274,21 @@ impl<S: FileSource + Clone> DirectoryContents<S> {
((_, a), (_, b), _) if !a.as_contents().is_dir() && b.as_contents().is_dir() => {
std::cmp::Ordering::Greater
}
((_, a), (_, b), _)
if a.as_contents().is_missing() && !b.as_contents().is_missing() =>
{
std::cmp::Ordering::Greater
}
((_, a), (_, b), _)
if !a.as_contents().is_missing() && b.as_contents().is_missing() =>
{
std::cmp::Ordering::Less
}
((n_a, a), (n_b, b), _)
if a.as_contents().is_missing() && b.as_contents().is_missing() =>
{
n_a.cmp(n_b)
}
((a, _), (b, _), Some(sort_by)) => sort_by(&***a, &***b),
_ => std::cmp::Ordering::Equal,
}) {

View File

@@ -121,14 +121,14 @@ impl<S: ArchiveSource + Clone> MerkleArchive<Section<S>> {
}
if max_size > *root_maxsize {
return Err(Error::new(
eyre!("merkle root directory max size too large"),
eyre!("root directory max size too large"),
ErrorKind::InvalidSignature,
));
}
} else {
if max_size > CAP_1_MiB as u64 {
return Err(Error::new(
eyre!("merkle root directory max size over 1MiB, cancelling download in case of DOS attack"),
eyre!("root directory max size over 1MiB, cancelling download in case of DOS attack"),
ErrorKind::InvalidSignature,
));
}
@@ -377,6 +377,9 @@ impl<S> EntryContents<S> {
pub fn is_dir(&self) -> bool {
matches!(self, &EntryContents::Directory(_))
}
pub fn is_missing(&self) -> bool {
matches!(self, &EntryContents::Missing)
}
}
impl<S: ArchiveSource + Clone> EntryContents<Section<S>> {
#[instrument(skip_all)]

View File

@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
use std::task::Poll;
use bytes::Bytes;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{Stream, TryStreamExt};
use reqwest::header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE};
use reqwest::{Client, Url};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
@@ -54,11 +54,12 @@ impl HttpSource {
}
}
impl ArchiveSource for HttpSource {
type Reader = HttpReader;
type FetchReader = HttpReader;
type FetchAllReader = StreamReader<BoxStream<'static, Result<Bytes, std::io::Error>>, Bytes>;
async fn size(&self) -> Option<u64> {
self.size
}
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send, Error> {
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
Ok(StreamReader::new(
self.client
.get(self.url.clone())
@@ -72,7 +73,7 @@ impl ArchiveSource for HttpSource {
.apply(boxed),
))
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
match &self.range_support {
Ok(_) => Ok(HttpReader::Range(
StreamReader::new(if size > 0 {

View File

@@ -10,6 +10,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::prelude::*;
use crate::s9pk::merkle_archive::hash::VerifyingWriter;
use crate::util::io::{open_file, TmpDir};
pub mod http;
pub mod multi_cursor_file;
@@ -159,7 +160,7 @@ impl FileSource for PathBuf {
Ok(tokio::fs::metadata(self).await?.len())
}
async fn reader(&self) -> Result<Self::Reader, Error> {
Ok(File::open(self).await?)
Ok(open_file(self).await?)
}
}
@@ -180,18 +181,17 @@ impl FileSource for Arc<[u8]> {
}
pub trait ArchiveSource: Send + Sync + Sized + 'static {
type Reader: AsyncRead + Unpin + Send;
type FetchReader: AsyncRead + Unpin + Send;
type FetchAllReader: AsyncRead + Unpin + Send;
fn size(&self) -> impl Future<Output = Option<u64>> + Send {
async { None }
}
fn fetch_all(
&self,
) -> impl Future<Output = Result<impl AsyncRead + Unpin + Send, Error>> + Send;
fn fetch_all(&self) -> impl Future<Output = Result<Self::FetchAllReader, Error>> + Send;
fn fetch(
&self,
position: u64,
size: u64,
) -> impl Future<Output = Result<Self::Reader, Error>> + Send;
) -> impl Future<Output = Result<Self::FetchReader, Error>> + Send;
fn copy_all_to<W: AsyncWrite + Unpin + Send + ?Sized>(
&self,
w: &mut W,
@@ -222,14 +222,15 @@ pub trait ArchiveSource: Send + Sync + Sized + 'static {
}
impl<T: ArchiveSource> ArchiveSource for Arc<T> {
type Reader = T::Reader;
type FetchReader = T::FetchReader;
type FetchAllReader = T::FetchAllReader;
async fn size(&self) -> Option<u64> {
self.deref().size().await
}
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send, Error> {
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
self.deref().fetch_all().await
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
self.deref().fetch(position, size).await
}
async fn copy_all_to<W: AsyncWrite + Unpin + Send + ?Sized>(
@@ -249,11 +250,12 @@ impl<T: ArchiveSource> ArchiveSource for Arc<T> {
}
impl ArchiveSource for Arc<[u8]> {
type Reader = tokio::io::Take<std::io::Cursor<Self>>;
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send, Error> {
type FetchReader = tokio::io::Take<std::io::Cursor<Self>>;
type FetchAllReader = std::io::Cursor<Self>;
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
Ok(std::io::Cursor::new(self.clone()))
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
use tokio::io::AsyncReadExt;
let mut cur = std::io::Cursor::new(self.clone());
@@ -269,7 +271,7 @@ pub struct Section<S> {
size: u64,
}
impl<S: ArchiveSource> FileSource for Section<S> {
type Reader = S::Reader;
type Reader = S::FetchReader;
async fn size(&self) -> Result<u64, Error> {
Ok(self.size)
}
@@ -285,3 +287,81 @@ pub type DynRead = Box<dyn AsyncRead + Unpin + Send + Sync + 'static>;
pub fn into_dyn_read<R: AsyncRead + Unpin + Send + Sync + 'static>(r: R) -> DynRead {
Box::new(r)
}
#[derive(Clone)]
pub struct TmpSource<S> {
tmp_dir: Arc<TmpDir>,
source: S,
}
impl<S> TmpSource<S> {
pub fn new(tmp_dir: Arc<TmpDir>, source: S) -> Self {
Self { tmp_dir, source }
}
pub async fn gc(self) -> Result<(), Error> {
self.tmp_dir.gc().await
}
}
impl<S> std::ops::Deref for TmpSource<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.source
}
}
impl<S: ArchiveSource> ArchiveSource for TmpSource<S> {
type FetchReader = <S as ArchiveSource>::FetchReader;
type FetchAllReader = <S as ArchiveSource>::FetchAllReader;
async fn size(&self) -> Option<u64> {
self.source.size().await
}
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
self.source.fetch_all().await
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
self.source.fetch(position, size).await
}
async fn copy_all_to<W: AsyncWrite + Unpin + Send + ?Sized>(
&self,
w: &mut W,
) -> Result<(), Error> {
self.source.copy_all_to(w).await
}
async fn copy_to<W: AsyncWrite + Unpin + Send + ?Sized>(
&self,
position: u64,
size: u64,
w: &mut W,
) -> Result<(), Error> {
self.source.copy_to(position, size, w).await
}
}
impl<S: FileSource> From<TmpSource<S>> for DynFileSource {
fn from(value: TmpSource<S>) -> Self {
DynFileSource::new(value)
}
}
impl<S: FileSource> FileSource for TmpSource<S> {
type Reader = <S as FileSource>::Reader;
async fn size(&self) -> Result<u64, Error> {
self.source.size().await
}
async fn reader(&self) -> Result<Self::Reader, Error> {
self.source.reader().await
}
async fn copy<W: AsyncWrite + Unpin + Send + ?Sized>(
&self,
mut w: &mut W,
) -> Result<(), Error> {
self.source.copy(&mut w).await
}
async fn copy_verify<W: AsyncWrite + Unpin + Send + ?Sized>(
&self,
mut w: &mut W,
verify: Option<(Hash, u64)>,
) -> Result<(), Error> {
self.source.copy_verify(&mut w, verify).await
}
async fn to_vec(&self, verify: Option<(Hash, u64)>) -> Result<Vec<u8>, Error> {
self.source.to_vec(verify).await
}
}

View File

@@ -6,12 +6,13 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, ReadBuf, Take};
use tokio::sync::{Mutex, OwnedMutexGuard};
use crate::disk::mount::filesystem::loop_dev::LoopDev;
use crate::prelude::*;
use crate::s9pk::merkle_archive::source::{ArchiveSource, Section};
use crate::util::io::open_file;
fn path_from_fd(fd: RawFd) -> Result<PathBuf, Error> {
#[cfg(target_os = "linux")]
@@ -42,7 +43,7 @@ impl MultiCursorFile {
path_from_fd(self.fd)
}
pub async fn open(fd: &impl AsRawFd) -> Result<Self, Error> {
let f = File::open(path_from_fd(fd.as_raw_fd())?).await?;
let f = open_file(path_from_fd(fd.as_raw_fd())?).await?;
Ok(Self::from(f))
}
pub async fn cursor(&self) -> Result<FileCursor, Error> {
@@ -50,7 +51,7 @@ impl MultiCursorFile {
if let Ok(file) = self.file.clone().try_lock_owned() {
file
} else {
Arc::new(Mutex::new(File::open(self.path()?).await?))
Arc::new(Mutex::new(open_file(self.path()?).await?))
.try_lock_owned()
.expect("freshly created")
},
@@ -88,24 +89,48 @@ impl AsyncRead for FileCursor {
Pin::new(&mut (&mut **this.0.get_mut())).poll_read(cx, buf)
}
}
impl AsyncSeek for FileCursor {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
let this = self.project();
Pin::new(&mut (&mut **this.0.get_mut())).start_seek(position)
}
fn poll_complete(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<u64>> {
let this = self.project();
Pin::new(&mut (&mut **this.0.get_mut())).poll_complete(cx)
}
}
impl std::ops::Deref for FileCursor {
type Target = File;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl std::ops::DerefMut for FileCursor {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}
}
impl ArchiveSource for MultiCursorFile {
type Reader = Take<FileCursor>;
type FetchReader = Take<FileCursor>;
type FetchAllReader = FileCursor;
async fn size(&self) -> Option<u64> {
tokio::fs::metadata(self.path().ok()?)
.await
.ok()
.map(|m| m.len())
}
#[allow(refining_impl_trait)]
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send + 'static, Error> {
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
use tokio::io::AsyncSeekExt;
let mut file = self.cursor().await?;
file.0.seek(SeekFrom::Start(0)).await?;
Ok(file)
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
use tokio::io::AsyncSeekExt;
let mut file = self.cursor().await?;

View File

@@ -3,7 +3,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::prelude::*;
/// Most-significant byte, == 0x80
/// Most-significant bit, == 0x80
pub const MSB: u8 = 0b1000_0000;
const MAX_STR_LEN: u64 = 1024 * 1024; // 1 MiB
@@ -39,22 +39,20 @@ pub async fn serialize_varstring<W: AsyncWrite + Unpin + Send>(
Ok(())
}
const MAX_SIZE: usize = (std::mem::size_of::<u64>() * 8 + 7) / 7;
#[derive(Default)]
struct VarIntProcessor {
buf: [u8; 10],
maxsize: usize,
buf: [u8; MAX_SIZE],
i: usize,
}
impl VarIntProcessor {
fn new() -> VarIntProcessor {
VarIntProcessor {
maxsize: (std::mem::size_of::<u64>() * 8 + 7) / 7,
..VarIntProcessor::default()
}
Self::default()
}
fn push(&mut self, b: u8) -> Result<(), Error> {
if self.i >= self.maxsize {
if self.i >= MAX_SIZE {
return Err(Error::new(
eyre!("Unterminated varint"),
ErrorKind::ParseS9pk,

View File

@@ -4,37 +4,57 @@ pub mod rpc;
pub mod v1;
pub mod v2;
use std::io::SeekFrom;
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::io::{AsyncReadExt, AsyncSeek};
pub use v2::{manifest, S9pk};
use crate::context::CliContext;
use crate::prelude::*;
use crate::progress::FullProgressTracker;
use crate::s9pk::merkle_archive::source::{ArchiveSource, DynFileSource};
use crate::s9pk::v1::reader::S9pkReader;
use crate::s9pk::v2::compat::MAGIC_AND_VERSION;
use crate::util::io::TmpDir;
pub async fn load(ctx: &CliContext, path: impl AsRef<Path>) -> Result<File, Error> {
pub async fn load<S, K>(
source: S,
key: K,
progress: Option<&FullProgressTracker>,
) -> Result<S9pk<DynFileSource>, Error>
where
S: ArchiveSource,
S::FetchAllReader: AsyncSeek + Sync,
K: FnOnce() -> Result<ed25519_dalek::SigningKey, Error>,
{
// TODO: return s9pk
const MAGIC_LEN: usize = MAGIC_AND_VERSION.len();
let mut magic = [0_u8; MAGIC_LEN];
let mut file = tokio::fs::File::open(&path).await?;
file.read_exact(&mut magic).await?;
file.seek(SeekFrom::Start(0)).await?;
source.fetch(0, 3).await?.read_exact(&mut magic).await?;
if magic == v2::compat::MAGIC_AND_VERSION {
let phase = if let Some(progress) = progress {
let mut phase = progress.add_phase(
"Converting Package to V2".into(),
Some(source.size().await.unwrap_or(60)),
);
phase.start();
Some(phase)
} else {
None
};
tracing::info!("Converting package to v2 s9pk");
let new_path = path.as_ref().with_extension("compat.s9pk");
S9pk::from_v1(
S9pkReader::from_reader(file, true).await?,
&new_path,
ctx.developer_key()?.clone(),
let tmp_dir = TmpDir::new().await?;
let s9pk = S9pk::from_v1(
S9pkReader::from_reader(source.fetch_all().await?, true).await?,
Arc::new(tmp_dir),
key()?,
)
.await?;
tokio::fs::rename(&new_path, &path).await?;
file = tokio::fs::File::open(&path).await?;
tracing::info!("Converted s9pk successfully");
if let Some(mut phase) = phase {
phase.complete();
}
Ok(s9pk.into_dyn())
} else {
Ok(S9pk::deserialize(&Arc::new(source), None).await?.into_dyn())
}
Ok(file)
}

View File

@@ -1,19 +1,19 @@
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use models::ImageId;
use rpc_toolkit::{from_fn_async, Empty, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use ts_rs::TS;
use crate::context::CliContext;
use crate::prelude::*;
use crate::s9pk::manifest::Manifest;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::v2::pack::ImageConfig;
use crate::s9pk::v2::SIG_CONTEXT;
use crate::s9pk::S9pk;
use crate::util::io::TmpDir;
use crate::util::io::{create_file, open_file, TmpDir};
use crate::util::serde::{apply_expr, HandlerExtSerde};
pub const SKIP_ENV: &[&str] = &["TERM", "container", "HOME", "HOSTNAME"];
@@ -79,19 +79,25 @@ async fn add_image(
AddImageParams { id, config }: AddImageParams,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<(), Error> {
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?)
.await?
.into_dyn();
let mut s9pk = super::load(
MultiCursorFile::from(open_file(&s9pk_path).await?),
|| ctx.developer_key().cloned(),
None,
)
.await?;
s9pk.as_manifest_mut().images.insert(id, config);
let tmpdir = TmpDir::new().await?;
s9pk.load_images(&tmpdir).await?;
let tmp_dir = Arc::new(TmpDir::new().await?);
s9pk.load_images(tmp_dir.clone()).await?;
s9pk.validate_and_filter(None)?;
let tmp_path = s9pk_path.with_extension("s9pk.tmp");
let mut tmp_file = File::create(&tmp_path).await?;
let mut tmp_file = create_file(&tmp_path).await?;
s9pk.serialize(&mut tmp_file, true).await?;
drop(s9pk);
tmp_file.sync_all().await?;
tokio::fs::rename(&tmp_path, &s9pk_path).await?;
tmp_dir.gc().await?;
Ok(())
}
@@ -104,13 +110,18 @@ async fn edit_manifest(
EditManifestParams { expression }: EditManifestParams,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<Manifest, Error> {
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?).await?;
let mut s9pk = super::load(
MultiCursorFile::from(open_file(&s9pk_path).await?),
|| ctx.developer_key().cloned(),
None,
)
.await?;
let old = serde_json::to_value(s9pk.as_manifest()).with_kind(ErrorKind::Serialization)?;
*s9pk.as_manifest_mut() = serde_json::from_value(apply_expr(old.into(), &expression)?.into())
.with_kind(ErrorKind::Serialization)?;
let manifest = s9pk.as_manifest().clone();
let tmp_path = s9pk_path.with_extension("s9pk.tmp");
let mut tmp_file = File::create(&tmp_path).await?;
let mut tmp_file = create_file(&tmp_path).await?;
s9pk.as_archive_mut()
.set_signer(ctx.developer_key()?.clone(), SIG_CONTEXT);
s9pk.serialize(&mut tmp_file, true).await?;
@@ -123,9 +134,14 @@ async fn edit_manifest(
async fn file_tree(
ctx: CliContext,
_: Empty,
S9pkPath { s9pk }: S9pkPath,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<Vec<PathBuf>, Error> {
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?).await?;
let s9pk = super::load(
MultiCursorFile::from(open_file(&s9pk_path).await?),
|| ctx.developer_key().cloned(),
None,
)
.await?;
Ok(s9pk.as_archive().contents().file_paths(""))
}
@@ -138,11 +154,16 @@ struct CatParams {
async fn cat(
ctx: CliContext,
CatParams { file_path }: CatParams,
S9pkPath { s9pk }: S9pkPath,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<(), Error> {
use crate::s9pk::merkle_archive::source::FileSource;
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?).await?;
let s9pk = super::load(
MultiCursorFile::from(open_file(&s9pk_path).await?),
|| ctx.developer_key().cloned(),
None,
)
.await?;
tokio::io::copy(
&mut s9pk
.as_archive()
@@ -162,8 +183,13 @@ async fn cat(
async fn inspect_manifest(
ctx: CliContext,
_: Empty,
S9pkPath { s9pk }: S9pkPath,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<Manifest, Error> {
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?).await?;
let s9pk = super::load(
MultiCursorFile::from(open_file(&s9pk_path).await?),
|| ctx.developer_key().cloned(),
None,
)
.await?;
Ok(s9pk.as_manifest().clone())
}

View File

@@ -1,8 +1,7 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use emver::VersionRange;
use imbl_value::InOMap;
use exver::{Version, VersionRange};
use indexmap::IndexMap;
pub use models::PackageId;
use models::{ActionId, HealthCheckId, ImageId, VolumeId};
@@ -13,23 +12,16 @@ use crate::prelude::*;
use crate::s9pk::git_hash::GitHash;
use crate::s9pk::manifest::{Alerts, Description, HardwareRequirements};
use crate::util::serde::{Duration, IoFormat};
use crate::util::VersionString;
use crate::version::{Current, VersionT};
fn current_version() -> VersionString {
Current::new().semver().into()
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct Manifest {
#[serde(default = "current_version")]
pub eos_version: VersionString,
pub eos_version: Version,
pub id: PackageId,
#[serde(default)]
pub git_hash: Option<GitHash>,
pub title: String,
pub version: VersionString,
pub version: exver::emver::Version,
pub description: Description,
#[serde(default)]
pub assets: Assets,

View File

@@ -20,6 +20,7 @@ use super::header::{FileSection, Header, TableOfContents};
use super::SIG_CONTEXT;
use crate::prelude::*;
use crate::s9pk::v1::docker::DockerReader;
use crate::util::io::open_file;
use crate::util::VersionString;
#[pin_project::pin_project]
@@ -150,9 +151,7 @@ pub struct S9pkReader<R: AsyncRead + AsyncSeek + Unpin + Send + Sync = BufReader
impl S9pkReader {
pub async fn open<P: AsRef<Path>>(path: P, check_sig: bool) -> Result<Self, Error> {
let p = path.as_ref();
let rdr = File::open(p)
.await
.with_ctx(|_| (crate::error::ErrorKind::Filesystem, p.display().to_string()))?;
let rdr = open_file(p).await?;
Self::from_reader(BufReader::new(rdr), check_sig).await
}

View File

@@ -2,9 +2,8 @@ use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
use itertools::Itertools;
use exver::ExtendedVersion;
use models::ImageId;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWriteExt};
use tokio::process::Command;
@@ -12,29 +11,35 @@ use crate::dependencies::{DepInfo, Dependencies};
use crate::prelude::*;
use crate::s9pk::manifest::Manifest;
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::Section;
use crate::s9pk::merkle_archive::source::TmpSource;
use crate::s9pk::merkle_archive::{Entry, MerkleArchive};
use crate::s9pk::rpc::SKIP_ENV;
use crate::s9pk::v1::manifest::{Manifest as ManifestV1, PackageProcedure};
use crate::s9pk::v1::reader::S9pkReader;
use crate::s9pk::v2::pack::{PackSource, CONTAINER_TOOL};
use crate::s9pk::v2::pack::{ImageSource, PackSource, CONTAINER_TOOL};
use crate::s9pk::v2::{S9pk, SIG_CONTEXT};
use crate::util::io::TmpDir;
use crate::util::io::{create_file, TmpDir};
use crate::util::Invoke;
pub const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x01];
impl S9pk<Section<MultiCursorFile>> {
impl S9pk<TmpSource<PackSource>> {
#[instrument(skip_all)]
pub async fn from_v1<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
mut reader: S9pkReader<R>,
destination: impl AsRef<Path>,
tmp_dir: Arc<TmpDir>,
signer: ed25519_dalek::SigningKey,
) -> Result<Self, Error> {
let scratch_dir = TmpDir::new().await?;
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg("--privileged")
.arg("tonistiigi/binfmt")
.arg("--install")
.arg("all")
.invoke(ErrorKind::Docker)
.await?;
let mut archive = DirectoryContents::<PackSource>::new();
let mut archive = DirectoryContents::<TmpSource<PackSource>>::new();
// manifest.json
let manifest_raw = reader.manifest().await?;
@@ -56,33 +61,35 @@ impl S9pk<Section<MultiCursorFile>> {
let license: Arc<[u8]> = reader.license().await?.to_vec().await?.into();
archive.insert_path(
"LICENSE.md",
Entry::file(PackSource::Buffered(license.into())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(license.into()),
)),
)?;
// instructions.md
let instructions: Arc<[u8]> = reader.instructions().await?.to_vec().await?.into();
archive.insert_path(
"instructions.md",
Entry::file(PackSource::Buffered(instructions.into())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(instructions.into()),
)),
)?;
// icon.md
let icon: Arc<[u8]> = reader.icon().await?.to_vec().await?.into();
archive.insert_path(
format!("icon.{}", manifest.assets.icon_type()),
Entry::file(PackSource::Buffered(icon.into())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(icon.into()),
)),
)?;
// images
for arch in reader.docker_arches().await? {
let images_dir = scratch_dir.join("images").join(&arch);
let docker_platform = if arch == "x86_64" {
"--platform=linux/amd64".to_owned()
} else if arch == "aarch64" {
"--platform=linux/arm64".to_owned()
} else {
format!("--platform=linux/{arch}")
};
let images_dir = tmp_dir.join("images").join(&arch);
tokio::fs::create_dir_all(&images_dir).await?;
Command::new(CONTAINER_TOOL)
.arg("load")
@@ -93,97 +100,24 @@ impl S9pk<Section<MultiCursorFile>> {
let mut image_config = new_manifest.images.remove(image).unwrap_or_default();
image_config.arch.insert(arch.as_str().into());
new_manifest.images.insert(image.clone(), image_config);
let sqfs_path = images_dir.join(image).with_extension("squashfs");
let image_name = if *system {
format!("start9/{}:latest", image)
} else {
format!("start9/{}/{}:{}", manifest.id, image, manifest.version)
};
let id = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
.arg(&docker_platform)
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?;
let env = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg(&docker_platform)
.arg("--entrypoint")
.arg("env")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?
.lines()
.filter(|l| {
l.trim()
.split_once("=")
.map_or(false, |(v, _)| !SKIP_ENV.contains(&v))
})
.join("\n")
+ "\n";
let workdir = Path::new(
String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg(&docker_platform)
.arg("--entrypoint")
.arg("pwd")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?
.trim(),
)
.to_owned();
Command::new("bash")
.arg("-c")
.arg(format!(
"{CONTAINER_TOOL} export {id} | mksquashfs - {sqfs} -tar",
id = id.trim(),
sqfs = sqfs_path.display()
))
.invoke(ErrorKind::Docker)
ImageSource::DockerTag(image_name.clone())
.load(
tmp_dir.clone(),
&new_manifest.id,
&new_manifest.version,
image,
&arch,
&mut archive,
)
.await?;
Command::new(CONTAINER_TOOL)
.arg("rm")
.arg(id.trim())
.invoke(ErrorKind::Docker)
.await?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("squashfs"),
Entry::file(PackSource::File(sqfs_path)),
)?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("env"),
Entry::file(PackSource::Buffered(Vec::from(env).into())),
)?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("json"),
Entry::file(PackSource::Buffered(
serde_json::to_vec(&serde_json::json!({
"workdir": workdir
}))
.with_kind(ErrorKind::Serialization)?
.into(),
)),
)?;
Command::new(CONTAINER_TOOL)
.arg("rmi")
.arg("-f")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?;
@@ -191,7 +125,7 @@ impl S9pk<Section<MultiCursorFile>> {
}
// assets
let asset_dir = scratch_dir.join("assets");
let asset_dir = tmp_dir.join("assets");
tokio::fs::create_dir_all(&asset_dir).await?;
tokio_tar::Archive::new(reader.assets().await?)
.unpack(&asset_dir)
@@ -212,21 +146,21 @@ impl S9pk<Section<MultiCursorFile>> {
Path::new("assets")
.join(&asset_id)
.with_extension("squashfs"),
Entry::file(PackSource::File(sqfs_path)),
Entry::file(TmpSource::new(tmp_dir.clone(), PackSource::File(sqfs_path))),
)?;
}
// javascript
let js_dir = scratch_dir.join("javascript");
let js_dir = tmp_dir.join("javascript");
let sqfs_path = js_dir.with_extension("squashfs");
tokio::fs::create_dir_all(&js_dir).await?;
if let Some(mut scripts) = reader.scripts().await? {
let mut js_file = File::create(js_dir.join("embassy.js")).await?;
let mut js_file = create_file(js_dir.join("embassy.js")).await?;
tokio::io::copy(&mut scripts, &mut js_file).await?;
js_file.sync_all().await?;
}
{
let mut js_file = File::create(js_dir.join("embassyManifest.json")).await?;
let mut js_file = create_file(js_dir.join("embassyManifest.json")).await?;
js_file
.write_all(&serde_json::to_vec(&manifest_raw).with_kind(ErrorKind::Serialization)?)
.await?;
@@ -239,30 +173,24 @@ impl S9pk<Section<MultiCursorFile>> {
.await?;
archive.insert_path(
Path::new("javascript.squashfs"),
Entry::file(PackSource::File(sqfs_path)),
Entry::file(TmpSource::new(tmp_dir.clone(), PackSource::File(sqfs_path))),
)?;
archive.insert_path(
"manifest.json",
Entry::file(PackSource::Buffered(
serde_json::to_vec::<Manifest>(&new_manifest)
.with_kind(ErrorKind::Serialization)?
.into(),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(
serde_json::to_vec::<Manifest>(&new_manifest)
.with_kind(ErrorKind::Serialization)?
.into(),
),
)),
)?;
let mut s9pk = S9pk::new(MerkleArchive::new(archive, signer, SIG_CONTEXT), None).await?;
let mut dest_file = File::create(destination.as_ref()).await?;
s9pk.serialize(&mut dest_file, false).await?;
dest_file.sync_all().await?;
scratch_dir.delete().await?;
Ok(S9pk::deserialize(
&MultiCursorFile::from(File::open(destination.as_ref()).await?),
None,
)
.await?)
let mut res = S9pk::new(MerkleArchive::new(archive, signer, SIG_CONTEXT), None).await?;
res.as_archive_mut().update_hashes(true).await?;
Ok(res)
}
}
@@ -272,7 +200,7 @@ impl From<ManifestV1> for Manifest {
Self {
id: value.id,
title: value.title,
version: value.version,
version: ExtendedVersion::from(value.version).into(),
release_notes: value.release_notes,
license: value.license.into(),
wrapper_repo: value.wrapper_repo,

View File

@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;
use color_eyre::eyre::eyre;
use exver::Version;
use helpers::const_true;
use imbl_value::InternedString;
pub use models::PackageId;
@@ -20,8 +21,8 @@ use crate::util::serde::Regex;
use crate::util::VersionString;
use crate::version::{Current, VersionT};
fn current_version() -> VersionString {
Current::new().semver().into()
fn current_version() -> Version {
Current::new().semver()
}
#[derive(Clone, Debug, Deserialize, Serialize, HasModel, TS)]
@@ -59,7 +60,8 @@ pub struct Manifest {
#[ts(type = "string | null")]
pub git_hash: Option<GitHash>,
#[serde(default = "current_version")]
pub os_version: VersionString,
#[ts(type = "string")]
pub os_version: Version,
#[serde(default = "const_true")]
pub has_config: bool,
}

View File

@@ -12,10 +12,12 @@ use crate::s9pk::manifest::Manifest;
use crate::s9pk::merkle_archive::file_contents::FileContents;
use crate::s9pk::merkle_archive::sink::Sink;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::{ArchiveSource, DynFileSource, FileSource, Section};
use crate::s9pk::merkle_archive::source::{
ArchiveSource, DynFileSource, FileSource, Section, TmpSource,
};
use crate::s9pk::merkle_archive::{Entry, MerkleArchive};
use crate::s9pk::v2::pack::{ImageSource, PackSource};
use crate::util::io::TmpDir;
use crate::util::io::{open_file, TmpDir};
const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x02];
@@ -165,8 +167,8 @@ impl<S: FileSource + Clone> S9pk<S> {
}
}
impl<S: From<PackSource> + FileSource + Clone> S9pk<S> {
pub async fn load_images(&mut self, tmpdir: &TmpDir) -> Result<(), Error> {
impl<S: From<TmpSource<PackSource>> + FileSource + Clone> S9pk<S> {
pub async fn load_images(&mut self, tmp_dir: Arc<TmpDir>) -> Result<(), Error> {
let id = &self.manifest.id;
let version = &self.manifest.version;
for (image_id, image_config) in &mut self.manifest.images {
@@ -175,7 +177,7 @@ impl<S: From<PackSource> + FileSource + Clone> S9pk<S> {
image_config
.source
.load(
tmpdir,
tmp_dir.clone(),
id,
version,
image_id,
@@ -206,7 +208,7 @@ impl<S: ArchiveSource + Clone> S9pk<Section<S>> {
)
.await?;
let mut magic_version = [0u8; 3];
let mut magic_version = [0u8; MAGIC_AND_VERSION.len()];
header.read_exact(&mut magic_version).await?;
ensure_code!(
&magic_version == MAGIC_AND_VERSION,
@@ -232,7 +234,7 @@ impl S9pk {
Self::deserialize(&MultiCursorFile::from(file), None).await
}
pub async fn open(path: impl AsRef<Path>, id: Option<&PackageId>) -> Result<Self, Error> {
let res = Self::from_file(tokio::fs::File::open(path).await?).await?;
let res = Self::from_file(open_file(path).await?).await?;
if let Some(id) = id {
ensure_code!(
&res.as_manifest().id == id,

View File

@@ -10,7 +10,6 @@ use futures::{FutureExt, TryStreamExt};
use imbl_value::InternedString;
use models::{ImageId, PackageId, VersionString};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::process::Command;
use tokio::sync::OnceCell;
@@ -23,12 +22,12 @@ use crate::rpc_continuations::Guid;
use crate::s9pk::merkle_archive::directory_contents::DirectoryContents;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::{
into_dyn_read, ArchiveSource, DynFileSource, FileSource,
into_dyn_read, ArchiveSource, DynFileSource, FileSource, TmpSource,
};
use crate::s9pk::merkle_archive::{Entry, MerkleArchive};
use crate::s9pk::v2::SIG_CONTEXT;
use crate::s9pk::S9pk;
use crate::util::io::TmpDir;
use crate::util::io::{create_file, open_file, TmpDir};
use crate::util::Invoke;
#[cfg(not(feature = "docker"))]
@@ -64,7 +63,7 @@ impl SqfsDir {
.invoke(ErrorKind::Filesystem)
.await?;
Ok(MultiCursorFile::from(
File::open(&path)
open_file(&path)
.await
.with_ctx(|_| (ErrorKind::Filesystem, path.display()))?,
))
@@ -100,11 +99,7 @@ impl FileSource for PackSource {
async fn reader(&self) -> Result<Self::Reader, Error> {
match self {
Self::Buffered(a) => Ok(into_dyn_read(Cursor::new(a.clone()))),
Self::File(f) => Ok(into_dyn_read(
File::open(f)
.await
.with_ctx(|_| (ErrorKind::Filesystem, f.display()))?,
)),
Self::File(f) => Ok(into_dyn_read(open_file(f).await?)),
Self::Squashfs(dir) => dir.file().await?.fetch_all().await.map(into_dyn_read),
}
}
@@ -284,9 +279,9 @@ pub enum ImageSource {
}
impl ImageSource {
#[instrument(skip_all)]
pub fn load<'a, S: From<PackSource> + FileSource + Clone>(
pub fn load<'a, S: From<TmpSource<PackSource>> + FileSource + Clone>(
&'a self,
tmpdir: &'a TmpDir,
tmp_dir: Arc<TmpDir>,
id: &'a PackageId,
version: &'a VersionString,
image_id: &'a ImageId,
@@ -331,12 +326,13 @@ impl ImageSource {
.arg(&tag)
.arg(&docker_platform)
.arg("-o")
.arg("type=image")
.arg("type=docker,dest=-")
.capture(false)
.pipe(Command::new(CONTAINER_TOOL).arg("load"))
.invoke(ErrorKind::Docker)
.await?;
ImageSource::DockerTag(tag.clone())
.load(tmpdir, id, version, image_id, arch, into)
.load(tmp_dir, id, version, image_id, arch, into)
.await?;
Command::new(CONTAINER_TOOL)
.arg("rmi")
@@ -390,21 +386,24 @@ impl ImageSource {
into.insert_path(
base_path.with_extension("json"),
Entry::file(
PackSource::Buffered(
serde_json::to_vec(&ImageMetadata {
workdir: if config.working_dir == Path::new("") {
"/".into()
} else {
config.working_dir
},
user: if config.user.is_empty() {
"root".into()
} else {
config.user.into()
},
})
.with_kind(ErrorKind::Serialization)?
.into(),
TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(
serde_json::to_vec(&ImageMetadata {
workdir: if config.working_dir == Path::new("") {
"/".into()
} else {
config.working_dir
},
user: if config.user.is_empty() {
"root".into()
} else {
config.user.into()
},
})
.with_kind(ErrorKind::Serialization)?
.into(),
),
)
.into(),
),
@@ -412,10 +411,16 @@ impl ImageSource {
into.insert_path(
base_path.with_extension("env"),
Entry::file(
PackSource::Buffered(config.env.join("\n").into_bytes().into()).into(),
TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(config.env.join("\n").into_bytes().into()),
)
.into(),
),
)?;
let dest = tmpdir.join(Guid::new().as_ref()).with_extension("squashfs");
let dest = tmp_dir
.join(Guid::new().as_ref())
.with_extension("squashfs");
let container = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
@@ -438,7 +443,7 @@ impl ImageSource {
.await?;
into.insert_path(
base_path.with_extension("squashfs"),
Entry::file(PackSource::File(dest).into()),
Entry::file(TmpSource::new(tmp_dir.clone(), PackSource::File(dest)).into()),
)?;
Ok(())
@@ -460,8 +465,8 @@ pub struct ImageMetadata {
#[instrument(skip_all)]
pub async fn pack(ctx: CliContext, params: PackParams) -> Result<(), Error> {
let tmpdir = Arc::new(TmpDir::new().await?);
let mut files = DirectoryContents::<PackSource>::new();
let tmp_dir = Arc::new(TmpDir::new().await?);
let mut files = DirectoryContents::<TmpSource<PackSource>>::new();
let js_dir = params.javascript();
let manifest: Arc<[u8]> = Command::new("node")
.arg("-e")
@@ -474,7 +479,10 @@ pub async fn pack(ctx: CliContext, params: PackParams) -> Result<(), Error> {
.into();
files.insert(
"manifest.json".into(),
Entry::file(PackSource::Buffered(manifest.clone())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Buffered(manifest.clone()),
)),
);
let icon = params.icon().await?;
let icon_ext = icon
@@ -483,22 +491,28 @@ pub async fn pack(ctx: CliContext, params: PackParams) -> Result<(), Error> {
.to_string_lossy();
files.insert(
InternedString::from_display(&lazy_format!("icon.{}", icon_ext)),
Entry::file(PackSource::File(icon)),
Entry::file(TmpSource::new(tmp_dir.clone(), PackSource::File(icon))),
);
files.insert(
"LICENSE.md".into(),
Entry::file(PackSource::File(params.license())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::File(params.license()),
)),
);
files.insert(
"instructions.md".into(),
Entry::file(PackSource::File(params.instructions())),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::File(params.instructions()),
)),
);
files.insert(
"javascript.squashfs".into(),
Entry::file(PackSource::Squashfs(Arc::new(SqfsDir::new(
js_dir,
tmpdir.clone(),
)))),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Squashfs(Arc::new(SqfsDir::new(js_dir, tmp_dir.clone()))),
)),
);
let mut s9pk = S9pk::new(
@@ -511,26 +525,29 @@ pub async fn pack(ctx: CliContext, params: PackParams) -> Result<(), Error> {
for assets in s9pk.as_manifest().assets.clone() {
s9pk.as_archive_mut().contents_mut().insert_path(
Path::new("assets").join(&assets).with_extension("squashfs"),
Entry::file(PackSource::Squashfs(Arc::new(SqfsDir::new(
assets_dir.join(&assets),
tmpdir.clone(),
)))),
Entry::file(TmpSource::new(
tmp_dir.clone(),
PackSource::Squashfs(Arc::new(SqfsDir::new(
assets_dir.join(&assets),
tmp_dir.clone(),
))),
)),
)?;
}
s9pk.load_images(&*tmpdir).await?;
s9pk.load_images(tmp_dir.clone()).await?;
s9pk.validate_and_filter(None)?;
s9pk.serialize(
&mut File::create(params.output(&s9pk.as_manifest().id)).await?,
&mut create_file(params.output(&s9pk.as_manifest().id)).await?,
false,
)
.await?;
drop(s9pk);
tmpdir.gc().await?;
tmp_dir.gc().await?;
Ok(())
}

View File

@@ -11,7 +11,6 @@ use persistent_container::PersistentContainer;
use rpc_toolkit::{from_fn_async, CallRemoteHandler, Empty, HandlerArgs, HandlerFor};
use serde::{Deserialize, Serialize};
use start_stop::StartStop;
use tokio::fs::File;
use tokio::sync::Notify;
use ts_rs::TS;
@@ -33,6 +32,7 @@ use crate::status::MainStatus;
use crate::util::actor::background::BackgroundJobQueue;
use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::actor::Actor;
use crate::util::io::create_file;
use crate::util::serde::Pem;
use crate::volume::data_dir;
@@ -403,7 +403,7 @@ impl Service {
#[instrument(skip_all)]
pub async fn backup(&self, guard: impl GenericMountGuard) -> Result<(), Error> {
let id = &self.seed.id;
let mut file = File::create(guard.path().join(id).with_extension("s9pk")).await?;
let mut file = create_file(guard.path().join(id).with_extension("s9pk")).await?;
self.seed
.persistent_container
.s9pk

View File

@@ -9,14 +9,12 @@ use helpers::NonDetachingJoinHandle;
use models::{ImageId, ProcedureName, VolumeId};
use rpc_toolkit::{Empty, Server, ShutdownHandle};
use serde::de::DeserializeOwned;
use tokio::fs::File;
use tokio::process::Command;
use tokio::sync::{oneshot, watch, Mutex, OnceCell};
use tracing::instrument;
use super::service_effect_handler::{service_effect_handler, EffectContext};
use super::transition::{TransitionKind, TransitionState};
use super::ServiceActorSeed;
use crate::context::RpcContext;
use crate::disk::mount::filesystem::bind::Bind;
use crate::disk::mount::filesystem::idmapped::IdMapped;
@@ -32,6 +30,7 @@ use crate::s9pk::merkle_archive::source::FileSource;
use crate::s9pk::S9pk;
use crate::service::start_stop::StartStop;
use crate::service::{rpc, RunningStatus, Service};
use crate::util::io::create_file;
use crate::util::rpc_client::UnixRpcClient;
use crate::util::Invoke;
use crate::volume::{asset_dir, data_dir};
@@ -237,7 +236,7 @@ impl PersistentContainer {
.get_path(Path::new("images").join(arch).join(&env_filename))
.and_then(|e| e.as_file())
{
env.copy(&mut File::create(image_path.join(&env_filename)).await?)
env.copy(&mut create_file(image_path.join(&env_filename)).await?)
.await?;
}
let json_filename = Path::new(image.as_ref()).with_extension("json");
@@ -247,7 +246,7 @@ impl PersistentContainer {
.get_path(Path::new("images").join(arch).join(&json_filename))
.and_then(|e| e.as_file())
{
json.copy(&mut File::create(image_path.join(&json_filename)).await?)
json.copy(&mut create_file(image_path.join(&json_filename)).await?)
.await?;
}
}

View File

@@ -7,8 +7,8 @@ use std::str::FromStr;
use std::sync::{Arc, Weak};
use clap::builder::ValueParserFactory;
use clap::{CommandFactory, FromArgMatches, Parser};
use emver::VersionRange;
use clap::Parser;
use exver::VersionRange;
use imbl_value::json;
use itertools::Itertools;
use models::{
@@ -1383,7 +1383,7 @@ struct CheckDependenciesResult {
is_running: bool,
health_checks: Vec<HealthCheckResult>,
#[ts(type = "string | null")]
version: Option<emver::Version>,
version: Option<exver::ExtendedVersion>,
}
async fn check_dependencies(

View File

@@ -94,12 +94,19 @@ impl ServiceMap {
}
#[instrument(skip_all)]
pub async fn install<S: FileSource + Clone>(
pub async fn install<F, Fut, S: FileSource + Clone>(
&self,
ctx: RpcContext,
mut s9pk: S9pk<S>,
s9pk: F,
recovery_source: Option<impl GenericMountGuard>,
) -> Result<DownloadInstallFuture, Error> {
progress: Option<FullProgressTracker>,
) -> Result<DownloadInstallFuture, Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<S9pk<S>, Error>>,
S: FileSource + Clone,
{
let mut s9pk = s9pk().await?;
s9pk.validate_and_filter(ctx.s9pk_arch)?;
let manifest = s9pk.as_manifest().clone();
let id = manifest.id.clone();
@@ -118,7 +125,7 @@ impl ServiceMap {
};
let size = s9pk.size();
let progress = FullProgressTracker::new();
let progress = progress.unwrap_or_else(|| FullProgressTracker::new());
let download_progress_contribution = size.unwrap_or(60);
let mut download_progress = progress.add_phase(
InternedString::intern("Download"),

View File

@@ -8,7 +8,6 @@ use patch_db::json_ptr::ROOT;
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::{from_fn_async, Context, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::try_join;
use tracing::instrument;
@@ -35,7 +34,7 @@ use crate::prelude::*;
use crate::progress::{FullProgress, PhaseProgressTrackerHandle};
use crate::rpc_continuations::Guid;
use crate::util::crypto::EncryptedWire;
use crate::util::io::{dir_copy, dir_size, Counter};
use crate::util::io::{create_file, dir_copy, dir_size, Counter};
use crate::{Error, ErrorKind, ResultExt};
pub fn setup<C: Context>() -> ParentHandler<C> {
@@ -324,7 +323,7 @@ pub async fn execute(
pub async fn complete(ctx: SetupContext) -> Result<SetupResult, Error> {
match ctx.result.get() {
Some(Ok((res, ctx))) => {
let mut guid_file = File::create("/media/startos/config/disk.guid").await?;
let mut guid_file = create_file("/media/startos/config/disk.guid").await?;
guid_file.write_all(ctx.disk_guid.as_bytes()).await?;
guid_file.sync_all().await?;
Ok(res.clone())

View File

@@ -13,6 +13,7 @@ use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::prelude::*;
use crate::util::clap::FromStrParser;
use crate::util::io::create_file;
use crate::util::serde::{display_serializable, HandlerExtSerde, WithIoFormat};
pub const SSH_AUTHORIZED_KEYS_FILE: &str = "/home/start9/.ssh/authorized_keys";
@@ -229,7 +230,7 @@ pub async fn sync_keys<P: AsRef<Path>>(keys: &SshKeys, dest: P) -> Result<(), Er
if tokio::fs::metadata(ssh_dir).await.is_err() {
tokio::fs::create_dir_all(ssh_dir).await?;
}
let mut f = tokio::fs::File::create(dest).await?;
let mut f = create_file(dest).await?;
for key in keys.0.values() {
f.write_all(key.0.to_key_format().as_bytes()).await?;
f.write_all(b"\n").await?;

View File

@@ -20,6 +20,7 @@ use crate::prelude::*;
use crate::rpc_continuations::RpcContinuations;
use crate::shutdown::Shutdown;
use crate::util::cpupower::{get_available_governors, set_governor, Governor};
use crate::util::io::open_file;
use crate::util::serde::{display_serializable, HandlerExtSerde, WithIoFormat};
use crate::util::Invoke;
@@ -657,7 +658,7 @@ impl ProcStat {
async fn get_proc_stat() -> Result<ProcStat, Error> {
use tokio::io::AsyncBufReadExt;
let mut cpu_line = String::new();
let _n = tokio::io::BufReader::new(tokio::fs::File::open("/proc/stat").await?)
let _n = tokio::io::BufReader::new(open_file("/proc/stat").await?)
.read_line(&mut cpu_line)
.await?;
let stats: Vec<u64> = cpu_line

View File

@@ -4,8 +4,8 @@ use std::time::Duration;
use clap::{ArgAction, Parser};
use color_eyre::eyre::{eyre, Result};
use emver::{Version, VersionRange};
use futures::{FutureExt, TryStreamExt};
use exver::{Version, VersionRange};
use futures::TryStreamExt;
use helpers::{AtomicFile, NonDetachingJoinHandle};
use imbl_value::json;
use itertools::Itertools;

View File

@@ -1,3 +1,4 @@
use std::io::SeekFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
@@ -5,20 +6,20 @@ use std::time::Duration;
use axum::body::Body;
use axum::response::Response;
use futures::StreamExt;
use futures::{ready, FutureExt, StreamExt};
use http::header::CONTENT_LENGTH;
use http::StatusCode;
use imbl_value::InternedString;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::watch;
use crate::context::RpcContext;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation};
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::multi_cursor_file::{FileCursor, MultiCursorFile};
use crate::s9pk::merkle_archive::source::ArchiveSource;
use crate::util::io::TmpDir;
use crate::util::io::{create_file, TmpDir};
pub async fn upload(
ctx: &RpcContext,
@@ -215,14 +216,15 @@ impl UploadingFile {
pub async fn new() -> Result<(UploadHandle, Self), Error> {
let progress = watch::channel(Progress::default());
let tmp_dir = Arc::new(TmpDir::new().await?);
let file = File::create(tmp_dir.join("upload.tmp")).await?;
let file = create_file(tmp_dir.join("upload.tmp")).await?;
let uploading = Self {
tmp_dir,
tmp_dir: tmp_dir.clone(),
file: MultiCursorFile::open(&file).await?,
progress: progress.1,
};
Ok((
UploadHandle {
tmp_dir,
file,
progress: progress.0,
},
@@ -237,22 +239,127 @@ impl UploadingFile {
}
}
impl ArchiveSource for UploadingFile {
type Reader = <MultiCursorFile as ArchiveSource>::Reader;
type FetchReader = <MultiCursorFile as ArchiveSource>::FetchReader;
type FetchAllReader = UploadingFileReader;
async fn size(&self) -> Option<u64> {
Progress::expected_size(&mut self.progress.clone()).await
}
async fn fetch_all(&self) -> Result<impl AsyncRead + Unpin + Send, Error> {
Progress::ready(&mut self.progress.clone()).await?;
self.file.fetch_all().await
async fn fetch_all(&self) -> Result<Self::FetchAllReader, Error> {
let mut file = self.file.cursor().await?;
file.seek(SeekFrom::Start(0)).await?;
Ok(UploadingFileReader {
tmp_dir: self.tmp_dir.clone(),
file,
position: 0,
to_seek: None,
progress: self.progress.clone(),
})
}
async fn fetch(&self, position: u64, size: u64) -> Result<Self::Reader, Error> {
async fn fetch(&self, position: u64, size: u64) -> Result<Self::FetchReader, Error> {
Progress::ready_for(&mut self.progress.clone(), position + size).await?;
self.file.fetch(position, size).await
}
}
#[pin_project::pin_project(project = UploadingFileReaderProjection)]
pub struct UploadingFileReader {
tmp_dir: Arc<TmpDir>,
position: u64,
to_seek: Option<SeekFrom>,
#[pin]
file: FileCursor,
progress: watch::Receiver<Progress>,
}
impl<'a> UploadingFileReaderProjection<'a> {
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Result<bool, std::io::Error> {
let ready = Progress::ready(&mut *self.progress);
tokio::pin!(ready);
Ok(ready
.poll_unpin(cx)
.map_err(|e| std::io::Error::other(e.source))?
.is_ready())
}
fn poll_ready_for(
&mut self,
cx: &mut std::task::Context<'_>,
size: u64,
) -> Result<bool, std::io::Error> {
let ready = Progress::ready_for(&mut *self.progress, size);
tokio::pin!(ready);
Ok(ready
.poll_unpin(cx)
.map_err(|e| std::io::Error::other(e.source))?
.is_ready())
}
}
impl AsyncRead for UploadingFileReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut this = self.project();
let position = *this.position;
if this.poll_ready(cx)? || this.poll_ready_for(cx, position + buf.remaining() as u64)? {
let start = buf.filled().len();
let res = this.file.poll_read(cx, buf);
*this.position += (buf.filled().len() - start) as u64;
res
} else {
Poll::Pending
}
}
}
impl AsyncSeek for UploadingFileReader {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
let this = self.project();
*this.to_seek = Some(position);
Ok(())
}
fn poll_complete(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<u64>> {
let mut this = self.project();
if let Some(to_seek) = *this.to_seek {
let size = match to_seek {
SeekFrom::Current(n) => (*this.position as i64 + n) as u64,
SeekFrom::Start(n) => n,
SeekFrom::End(n) => {
let expected_size = this.progress.borrow().expected_size;
match expected_size {
Some(end) => (end as i64 + n) as u64,
None => {
if !this.poll_ready(cx)? {
return Poll::Pending;
}
(this.progress.borrow().expected_size.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
eyre!("upload maked complete without expected size"),
)
})? as i64
+ n) as u64
}
}
}
};
if !this.poll_ready_for(cx, size)? {
return Poll::Pending;
}
}
if let Some(seek) = this.to_seek.take() {
this.file.as_mut().start_seek(seek)?;
}
*this.position = ready!(this.file.as_mut().poll_complete(cx)?);
Poll::Ready(Ok(*this.position))
}
}
#[pin_project::pin_project(PinnedDrop)]
pub struct UploadHandle {
tmp_dir: Arc<TmpDir>,
#[pin]
file: File,
progress: watch::Sender<Progress>,

View File

@@ -610,13 +610,13 @@ pub fn dir_copy<'a, P0: AsRef<Path> + 'a + Send + Sync, P1: AsRef<Path> + 'a + S
let src_path = e.path();
let dst_path = dst_path.join(e.file_name());
if m.is_file() {
let mut dst_file = tokio::fs::File::create(&dst_path).await.with_ctx(|_| {
let mut dst_file = create_file(&dst_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
format!("create {}", dst_path.display()),
)
})?;
let mut rdr = tokio::fs::File::open(&src_path).await.with_ctx(|_| {
let mut rdr = open_file(&src_path).await.with_ctx(|_| {
(
crate::ErrorKind::Filesystem,
format!("open {}", src_path.display()),
@@ -829,6 +829,13 @@ impl Drop for TmpDir {
}
}
pub async fn open_file(path: impl AsRef<Path>) -> Result<File, Error> {
let path = path.as_ref();
File::open(path)
.await
.with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("open {path:?}")))
}
pub async fn create_file(path: impl AsRef<Path>) -> Result<File, Error> {
let path = path.as_ref();
if let Some(parent) = path.parent() {

View File

@@ -26,6 +26,7 @@ use tokio::sync::{oneshot, Mutex, OwnedMutexGuard, RwLock};
use tracing::instrument;
use crate::shutdown::Shutdown;
use crate::util::io::create_file;
use crate::{Error, ErrorKind, ResultExt as _};
pub mod actor;
pub mod clap;
@@ -385,16 +386,16 @@ impl<T> SOption<T> for SNone<T> {}
#[async_trait]
pub trait AsyncFileExt: Sized {
async fn maybe_open<P: AsRef<Path> + Send + Sync>(path: P) -> std::io::Result<Option<Self>>;
async fn maybe_open<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Option<Self>, Error>;
async fn delete<P: AsRef<Path> + Send + Sync>(path: P) -> std::io::Result<()>;
}
#[async_trait]
impl AsyncFileExt for File {
async fn maybe_open<P: AsRef<Path> + Send + Sync>(path: P) -> std::io::Result<Option<Self>> {
match File::open(path).await {
async fn maybe_open<P: AsRef<Path> + Send + Sync>(path: P) -> Result<Option<Self>, Error> {
match File::open(path.as_ref()).await {
Ok(f) => Ok(Some(f)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
Err(e) => Err(e).with_ctx(|_| (ErrorKind::Filesystem, path.as_ref().display())),
}
}
async fn delete<P: AsRef<Path> + Send + Sync>(path: P) -> std::io::Result<()> {
@@ -590,9 +591,7 @@ impl FileLock {
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, parent.display().to_string()))?;
}
let f = File::create(&path)
.await
.with_ctx(|_| (crate::ErrorKind::Filesystem, path.display().to_string()))?;
let f = create_file(&path).await?;
let file_guard = tokio::task::spawn_blocking(move || {
fd_lock_rs::FdLock::lock(f, fd_lock_rs::LockType::Exclusive, blocking)
})

View File

@@ -3,7 +3,6 @@ use std::path::Path;
use clap::Parser;
use rpc_toolkit::{from_fn_async, Context, ParentHandler};
use serde::{Deserialize, Serialize};
use tokio::fs::File;
use url::Url;
use crate::context::CliContext;
@@ -11,7 +10,7 @@ use crate::prelude::*;
use crate::s9pk::merkle_archive::source::http::HttpSource;
use crate::s9pk::merkle_archive::source::multi_cursor_file::MultiCursorFile;
use crate::s9pk::merkle_archive::source::ArchiveSource;
use crate::util::io::ParallelBlake3Writer;
use crate::util::io::{open_file, ParallelBlake3Writer};
use crate::util::serde::Base16;
use crate::util::Apply;
use crate::CAP_10_MiB;
@@ -40,7 +39,7 @@ pub async fn b3sum(
path: impl AsRef<Path>,
allow_mmap: bool,
) -> Result<Base16<[u8; 32]>, Error> {
let file = MultiCursorFile::from(File::open(path).await?);
let file = MultiCursorFile::from(open_file(path).await?);
if allow_mmap {
return file.blake3_mmap().await.map(|h| *h.as_bytes()).map(Base16);
}

View File

@@ -1,4 +1,3 @@
use std::any::TypeId;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::ops::Deref;
@@ -9,10 +8,9 @@ use clap::{ArgMatches, CommandFactory, FromArgMatches};
use color_eyre::eyre::eyre;
use imbl::OrdMap;
use openssl::pkey::{PKey, Private};
use openssl::x509::{X509Ref, X509};
use openssl::x509::X509;
use rpc_toolkit::{
CliBindings, Context, Handler, HandlerArgs, HandlerArgsFor, HandlerFor, HandlerTypes,
PrintCliResult,
CliBindings, Context, HandlerArgs, HandlerArgsFor, HandlerFor, HandlerTypes, PrintCliResult,
};
use serde::de::DeserializeOwned;
use serde::ser::{SerializeMap, SerializeSeq};
@@ -1188,7 +1186,7 @@ pub trait PemEncoding: Sized {
impl PemEncoding for X509 {
fn from_pem<E: serde::de::Error>(pem: &str) -> Result<Self, E> {
X509::from_pem(pem.as_bytes()).map_err(E::custom)
Self::from_pem(pem.as_bytes()).map_err(E::custom)
}
fn to_pem<E: serde::ser::Error>(&self) -> Result<String, E> {
String::from_utf8((&**self).to_pem().map_err(E::custom)?).map_err(E::custom)
@@ -1197,7 +1195,7 @@ impl PemEncoding for X509 {
impl PemEncoding for PKey<Private> {
fn from_pem<E: serde::de::Error>(pem: &str) -> Result<Self, E> {
PKey::<Private>::private_key_from_pem(pem.as_bytes()).map_err(E::custom)
Self::private_key_from_pem(pem.as_bytes()).map_err(E::custom)
}
fn to_pem<E: serde::ser::Error>(&self) -> Result<String, E> {
String::from_utf8((&**self).private_key_to_pem_pkcs8().map_err(E::custom)?)
@@ -1207,7 +1205,7 @@ impl PemEncoding for PKey<Private> {
impl PemEncoding for ssh_key::PrivateKey {
fn from_pem<E: serde::de::Error>(pem: &str) -> Result<Self, E> {
ssh_key::PrivateKey::from_openssh(pem.as_bytes()).map_err(E::custom)
Self::from_openssh(pem.as_bytes()).map_err(E::custom)
}
fn to_pem<E: serde::ser::Error>(&self) -> Result<String, E> {
self.to_openssh(ssh_key::LineEnding::LF)
@@ -1219,7 +1217,7 @@ impl PemEncoding for ssh_key::PrivateKey {
impl PemEncoding for ed25519_dalek::VerifyingKey {
fn from_pem<E: serde::de::Error>(pem: &str) -> Result<Self, E> {
use ed25519_dalek::pkcs8::DecodePublicKey;
ed25519_dalek::VerifyingKey::from_public_key_pem(pem).map_err(E::custom)
Self::from_public_key_pem(pem).map_err(E::custom)
}
fn to_pem<E: serde::ser::Error>(&self) -> Result<String, E> {
use ed25519_dalek::pkcs8::EncodePublicKey;
@@ -1228,6 +1226,19 @@ impl PemEncoding for ed25519_dalek::VerifyingKey {
}
}
impl PemEncoding for ed25519_dalek::SigningKey {
fn from_pem<E: serde::de::Error>(pem: &str) -> Result<Self, E> {
use ed25519_dalek::pkcs8::DecodePrivateKey;
Self::from_pkcs8_pem(pem).map_err(E::custom)
}
fn to_pem<E: serde::ser::Error>(&self) -> Result<String, E> {
use ed25519_dalek::pkcs8::EncodePrivateKey;
self.to_pkcs8_pem(pkcs8::LineEnding::LF)
.map_err(E::custom)
.map(|s| s.as_str().to_owned())
}
}
pub mod pem {
use serde::{Deserialize, Deserializer, Serializer};

View File

@@ -25,20 +25,20 @@ enum Version {
V0_3_5_1(Wrapper<v0_3_5_1::Version>),
V0_3_5_2(Wrapper<v0_3_5_2::Version>),
V0_3_6(Wrapper<v0_3_6::Version>),
Other(emver::Version),
Other(exver::Version),
}
impl Version {
fn from_util_version(version: crate::util::VersionString) -> Self {
fn from_exver_version(version: exver::Version) -> Self {
serde_json::to_value(version.clone())
.and_then(serde_json::from_value)
.unwrap_or_else(|_e| {
tracing::warn!("Can't deserialize: {:?} and falling back to other", version);
Version::Other(version.into_version())
Version::Other(version)
})
}
#[cfg(test)]
fn as_sem_ver(&self) -> emver::Version {
fn as_exver(&self) -> exver::Version {
match self {
Version::LT0_3_5(LTWrapper(_, x)) => x.clone(),
Version::V0_3_5(Wrapper(x)) => x.semver(),
@@ -56,8 +56,8 @@ where
{
type Previous: VersionT;
fn new() -> Self;
fn semver(&self) -> emver::Version;
fn compat(&self) -> &'static emver::VersionRange;
fn semver(&self) -> exver::Version;
fn compat(&self) -> &'static exver::VersionRange;
fn up(&self, db: &TypedPatchDb<Database>) -> impl Future<Output = Result<(), Error>> + Send;
fn down(&self, db: &TypedPatchDb<Database>) -> impl Future<Output = Result<(), Error>> + Send;
fn commit(
@@ -158,7 +158,7 @@ where
}
#[derive(Debug, Clone)]
struct LTWrapper<T>(T, emver::Version);
struct LTWrapper<T>(T, exver::Version);
impl<T> serde::Serialize for LTWrapper<T>
where
T: VersionT,
@@ -172,10 +172,10 @@ where
T: VersionT,
{
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let v = crate::util::VersionString::deserialize(deserializer)?;
let v = exver::Version::deserialize(deserializer)?;
let version = T::new();
if *v < version.semver() {
Ok(Self(version, v.into_version()))
if v < version.semver() {
Ok(Self(version, v))
} else {
Err(serde::de::Error::custom("Mismatched Version"))
}
@@ -197,9 +197,9 @@ where
T: VersionT,
{
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let v = crate::util::VersionString::deserialize(deserializer)?;
let v = exver::Version::deserialize(deserializer)?;
let version = T::new();
if *v == version.semver() {
if v == version.semver() {
Ok(Wrapper(version))
} else {
Err(serde::de::Error::custom("Mismatched Version"))
@@ -212,7 +212,7 @@ pub async fn init(
mut progress: PhaseProgressTrackerHandle,
) -> Result<(), Error> {
progress.start();
let version = Version::from_util_version(
let version = Version::from_exver_version(
db.peek()
.await
.as_public()
@@ -256,9 +256,18 @@ mod tests {
use super::*;
fn em_version() -> impl Strategy<Value = emver::Version> {
any::<(usize, usize, usize, usize)>().prop_map(|(major, minor, patch, super_minor)| {
emver::Version::new(major, minor, patch, super_minor)
fn em_version() -> impl Strategy<Value = exver::Version> {
any::<(usize, usize, usize, bool)>().prop_map(|(major, minor, patch, alpha)| {
if alpha {
exver::Version::new(
[0, major, minor]
.into_iter()
.chain(Some(patch).filter(|n| *n != 0)),
[],
)
} else {
exver::Version::new([major, minor, patch], [])
}
})
}
@@ -273,15 +282,15 @@ mod tests {
proptest! {
#[test]
fn emversion_isomorphic_version(original in em_version()) {
let version = Version::from_util_version(original.clone().into());
let back = version.as_sem_ver();
fn exversion_isomorphic_version(original in em_version()) {
let version = Version::from_exver_version(original.clone().into());
let back = version.as_exver();
prop_assert_eq!(original, back, "All versions should round trip");
}
#[test]
fn version_isomorphic_em_version(version in versions()) {
let sem_ver = version.as_sem_ver();
let back = Version::from_util_version(sem_ver.into());
let sem_ver = version.as_exver();
let back = Version::from_exver_version(sem_ver.into());
prop_assert_eq!(format!("{:?}",version), format!("{:?}", back), "All versions should round trip");
}
}

View File

@@ -1,4 +1,4 @@
use emver::VersionRange;
use exver::{ExtendedVersion, VersionRange};
use super::VersionT;
use crate::db::model::Database;
@@ -6,17 +6,25 @@ use crate::prelude::*;
use crate::version::Current;
lazy_static::lazy_static! {
pub static ref V0_3_0_COMPAT: VersionRange = VersionRange::Conj(
Box::new(VersionRange::Anchor(
emver::GTE,
emver::Version::new(0, 3, 0, 0),
)),
Box::new(VersionRange::Anchor(emver::LTE, Current::new().semver())),
pub static ref V0_3_0_COMPAT: VersionRange = VersionRange::and(
VersionRange::anchor(
exver::GTE,
ExtendedVersion::new(
exver::Version::new([0, 3, 0], []),
exver::Version::default(),
),
),
VersionRange::anchor(
exver::LTE,
ExtendedVersion::new(
Current::new().semver(),
exver::Version::default(),
)
),
);
static ref V0_3_5: exver::Version = exver::Version::new([0, 3, 5], []);
}
const V0_3_5: emver::Version = emver::Version::new(0, 3, 5, 0);
#[derive(Clone, Debug)]
pub struct Version;
@@ -25,8 +33,8 @@ impl VersionT for Version {
fn new() -> Self {
Version
}
fn semver(&self) -> emver::Version {
V0_3_5
fn semver(&self) -> exver::Version {
V0_3_5.clone()
}
fn compat(&self) -> &'static VersionRange {
&V0_3_0_COMPAT

View File

@@ -1,11 +1,13 @@
use emver::VersionRange;
use exver::VersionRange;
use super::v0_3_5::V0_3_0_COMPAT;
use super::{v0_3_5, VersionT};
use crate::db::model::Database;
use crate::prelude::*;
const V0_3_5_1: emver::Version = emver::Version::new(0, 3, 5, 1);
lazy_static::lazy_static! {
static ref V0_3_5_1: exver::Version = exver::Version::new([0, 3, 5, 1], []);
}
#[derive(Clone, Debug)]
pub struct Version;
@@ -15,8 +17,8 @@ impl VersionT for Version {
fn new() -> Self {
Version
}
fn semver(&self) -> emver::Version {
V0_3_5_1
fn semver(&self) -> exver::Version {
V0_3_5_1.clone()
}
fn compat(&self) -> &'static VersionRange {
&V0_3_0_COMPAT

View File

@@ -1,11 +1,13 @@
use emver::VersionRange;
use exver::VersionRange;
use super::v0_3_5::V0_3_0_COMPAT;
use super::{v0_3_5_1, VersionT};
use crate::db::model::Database;
use crate::prelude::*;
const V0_3_5_2: emver::Version = emver::Version::new(0, 3, 5, 2);
lazy_static::lazy_static! {
static ref V0_3_5_2: exver::Version = exver::Version::new([0, 3, 5, 2], []);
}
#[derive(Clone, Debug)]
pub struct Version;
@@ -15,8 +17,8 @@ impl VersionT for Version {
fn new() -> Self {
Version
}
fn semver(&self) -> emver::Version {
V0_3_5_2
fn semver(&self) -> exver::Version {
V0_3_5_2.clone()
}
fn compat(&self) -> &'static VersionRange {
&V0_3_0_COMPAT

View File

@@ -1,11 +1,13 @@
use emver::VersionRange;
use exver::VersionRange;
use super::v0_3_5::V0_3_0_COMPAT;
use super::{v0_3_5_1, VersionT};
use crate::db::model::Database;
use crate::prelude::*;
const V0_3_6: emver::Version = emver::Version::new(0, 3, 6, 0);
lazy_static::lazy_static! {
static ref V0_3_6: exver::Version = exver::Version::new([0, 3, 6], []);
}
#[derive(Clone, Debug)]
pub struct Version;
@@ -15,8 +17,8 @@ impl VersionT for Version {
fn new() -> Self {
Version
}
fn semver(&self) -> emver::Version {
V0_3_6
fn semver(&self) -> exver::Version {
V0_3_6.clone()
}
fn compat(&self) -> &'static VersionRange {
&V0_3_0_COMPAT