build multi-arch s9pks (#2601)

* build multi-arch s9pks

* remove images incrementally

* wip

* prevent rebuild

* fix sdk makefile

* fix hanging on uninstall

* fix build

* fix build

* fix build

* fix build (for real this time)

* fix git hash computation
This commit is contained in:
Aiden McClelland
2024-04-22 11:40:10 -06:00
committed by GitHub
parent 9eff920989
commit 003d110948
176 changed files with 1176 additions and 1799 deletions

View File

@@ -149,6 +149,7 @@ async fn restore_packages(
S9pk::open(
backup_dir.path().join(&id).with_extension("s9pk"),
Some(&id),
true,
)
.await?,
Some(backup_dir),

View File

@@ -19,7 +19,7 @@ impl RequestGuid {
}
pub fn from(r: &str) -> Option<RequestGuid> {
if r.len() != 64 {
if r.len() != 32 {
return None;
}
for c in r.chars() {

View File

@@ -64,10 +64,10 @@ where
.await?;
}
let mut guid = format!(
"EMBASSY_{}",
"STARTOS_{}",
base32::encode(
base32::Alphabet::RFC4648 { padding: false },
&rand::random::<[u8; 32]>(),
&rand::random::<[u8; 20]>(),
)
);
if !encrypted {
@@ -219,7 +219,7 @@ pub async fn import<P: AsRef<Path>>(
if scan
.values()
.filter_map(|a| a.as_ref())
.filter(|a| a.starts_with("EMBASSY_"))
.filter(|a| a.starts_with("STARTOS_") || a.starts_with("EMBASSY_"))
.next()
.is_none()
{

View File

@@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use bytes::Buf;
use lazy_static::lazy_static;
use models::ResultExt;
use tokio::sync::Mutex;
@@ -115,7 +116,7 @@ impl GenericMountGuard for MountGuard {
async fn tmp_mountpoint(source: &impl FileSystem) -> Result<PathBuf, Error> {
Ok(Path::new(TMP_MOUNTPOINT).join(base32::encode(
base32::Alphabet::RFC4648 { padding: false },
&source.source_hash().await?,
&source.source_hash().await?[0..20],
)))
}

View File

@@ -1,127 +0,0 @@
use std::path::PathBuf;
use clap::Parser;
use rpc_toolkit::{command, from_fn_async, AnyContext, HandlerExt, ParentHandler};
use serde::{Deserialize, Serialize};
use crate::context::CliContext;
use crate::s9pk::manifest::Manifest;
// use crate::s9pk::reader::S9pkReader;
use crate::util::serde::HandlerExtSerde;
use crate::Error;
pub fn inspect() -> ParentHandler {
ParentHandler::new()
.subcommand("hash", from_fn_async(hash))
.subcommand(
"manifest",
from_fn_async(manifest).with_display_serializable(),
)
.subcommand("license", from_fn_async(license).no_display())
.subcommand("icon", from_fn_async(icon).no_display())
.subcommand("instructions", from_fn_async(instructions).no_display())
.subcommand("docker-images", from_fn_async(docker_images).no_display())
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct HashParams {
path: PathBuf,
}
pub async fn hash(_: CliContext, HashParams { path }: HashParams) -> Result<String, Error> {
Ok(S9pkReader::open(path, true)
.await?
.hash_str()
.unwrap()
.to_owned())
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct ManifestParams {
path: PathBuf,
#[arg(long = "no-verify")]
no_verify: bool,
}
// #[command(cli_only, display(display_serializable))]
pub async fn manifest(
_: CliContext,
ManifestParams { .. }: ManifestParams,
) -> Result<Manifest, Error> {
// S9pkReader::open(path, !no_verify).await?.manifest().await
todo!()
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct InspectParams {
path: PathBuf,
#[arg(long = "no-verify")]
no_verify: bool,
}
pub async fn license(
_: AnyContext,
InspectParams { path, no_verify }: InspectParams,
) -> Result<(), Error> {
tokio::io::copy(
&mut S9pkReader::open(path, !no_verify).await?.license().await?,
&mut tokio::io::stdout(),
)
.await?;
Ok(())
}
pub async fn icon(
_: AnyContext,
InspectParams { path, no_verify }: InspectParams,
) -> Result<(), Error> {
tokio::io::copy(
&mut S9pkReader::open(path, !no_verify).await?.icon().await?,
&mut tokio::io::stdout(),
)
.await?;
Ok(())
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
pub struct InstructionParams {
path: PathBuf,
#[arg(long = "no-verify")]
no_verify: bool,
}
pub async fn instructions(
_: CliContext,
InstructionParams { path, no_verify }: InstructionParams,
) -> Result<(), Error> {
tokio::io::copy(
&mut S9pkReader::open(path, !no_verify)
.await?
.instructions()
.await?,
&mut tokio::io::stdout(),
)
.await?;
Ok(())
}
pub async fn docker_images(
_: AnyContext,
InspectParams { path, no_verify }: InspectParams,
) -> Result<(), Error> {
tokio::io::copy(
&mut S9pkReader::open(path, !no_verify)
.await?
.docker_images()
.await?,
&mut tokio::io::stdout(),
)
.await?;
Ok(())
}

View File

@@ -148,6 +148,7 @@ pub async fn install(
.parse()?,
)
.await?,
true,
)
.await?;
@@ -257,7 +258,7 @@ pub async fn sideload(ctx: RpcContext) -> Result<SideloadResponse, Error> {
.await;
tokio::spawn(async move {
if let Err(e) = async {
let s9pk = S9pk::deserialize(&file).await?;
let s9pk = S9pk::deserialize(&file, true).await?;
let _ = id_send.send(s9pk.as_manifest().id.clone());
ctx.services
.install(ctx.clone(), s9pk, None::<Never>)
@@ -423,7 +424,12 @@ pub async fn uninstall(
let return_id = id.clone();
tokio::spawn(async move { ctx.services.uninstall(&ctx, &id).await });
tokio::spawn(async move {
if let Err(e) = ctx.services.uninstall(&ctx, &id).await {
tracing::error!("Error uninstalling service {id}: {e}");
tracing::debug!("{e:?}");
}
});
Ok(return_id)
}

View File

@@ -38,8 +38,6 @@ pub mod error;
pub mod firmware;
pub mod hostname;
pub mod init;
pub mod progress;
// pub mod inspect;
pub mod install;
pub mod logs;
pub mod lxc;
@@ -48,6 +46,7 @@ pub mod net;
pub mod notifications;
pub mod os_install;
pub mod prelude;
pub mod progress;
pub mod properties;
pub mod registry;
pub mod s9pk;

View File

@@ -18,17 +18,13 @@ use tokio_stream::wrappers::LinesStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::instrument;
use crate::context::{CliContext, RpcContext};
use crate::core::rpc_continuations::{RequestGuid, RpcContinuation};
use crate::error::ResultExt;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::util::serde::Reversible;
use crate::{
context::{CliContext, RpcContext},
lxc::ContainerId,
};
use crate::{
core::rpc_continuations::{RequestGuid, RpcContinuation},
util::Invoke,
};
use crate::util::Invoke;
#[pin_project::pin_project]
pub struct LogStream {
@@ -393,7 +389,9 @@ pub async fn journalctl(
before: bool,
follow: bool,
) -> Result<LogStream, Error> {
let mut cmd = gen_journalctl_command(&id, limit);
let mut cmd = gen_journalctl_command(&id);
cmd.arg(format!("--lines={}", limit));
let cursor_formatted = format!("--after-cursor={}", cursor.unwrap_or(""));
if cursor.is_some() {
@@ -410,12 +408,15 @@ pub async fn journalctl(
.with_kind(ErrorKind::Deserialization)?;
if follow {
let mut follow_cmd = gen_journalctl_command(&id, limit);
let mut follow_cmd = gen_journalctl_command(&id);
follow_cmd.arg("-f");
if let Some(last) = deserialized_entries.last() {
cmd.arg(format!("--after-cursor={}", last.cursor));
follow_cmd.arg(format!("--after-cursor={}", last.cursor));
follow_cmd.arg("--lines=all");
} else {
follow_cmd.arg("--lines=0");
}
let mut child = cmd.stdout(Stdio::piped()).spawn()?;
let mut child = follow_cmd.stdout(Stdio::piped()).spawn()?;
let out =
BufReader::new(child.stdout.take().ok_or_else(|| {
Error::new(eyre!("No stdout available"), crate::ErrorKind::Journald)
@@ -450,7 +451,7 @@ pub async fn journalctl(
}
}
fn gen_journalctl_command(id: &LogSource, limit: usize) -> Command {
fn gen_journalctl_command(id: &LogSource) -> Command {
let mut cmd = match id {
LogSource::Container(container_id) => {
let mut cmd = Command::new("lxc-attach");
@@ -465,7 +466,6 @@ fn gen_journalctl_command(id: &LogSource, limit: usize) -> Command {
cmd.arg("--output=json");
cmd.arg("--output-fields=MESSAGE");
cmd.arg(format!("-n{}", limit));
match id {
LogSource::Kernel => {
cmd.arg("-k");
@@ -477,7 +477,6 @@ fn gen_journalctl_command(id: &LogSource, limit: usize) -> Command {
LogSource::System => {
cmd.arg("-u");
cmd.arg(SYSTEM_UNIT);
cmd.arg(format!("_COMM={}", SYSTEM_UNIT));
}
LogSource::Container(_container_id) => {
cmd.arg("-u").arg("container-runtime.service");

View File

@@ -288,14 +288,6 @@ impl LxcContainer {
}
self.rootfs.take().unmount(true).await?;
let rootfs_path = self.rootfs_dir();
let err_path = rootfs_path.join("var/log/containerRuntime.err");
if tokio::fs::metadata(&err_path).await.is_ok() {
let mut lines = BufReader::new(File::open(&err_path).await?).lines();
while let Some(line) = lines.next_line().await? {
let container = &**self.guid;
tracing::error!(container, "{}", line);
}
}
if tokio::fs::metadata(&rootfs_path).await.is_ok()
&& tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&rootfs_path).await?)
.count()

View File

@@ -134,7 +134,7 @@ pub async fn publish(
.with_prefix("[1/3]")
.with_message("Querying s9pk");
pb.enable_steady_tick(Duration::from_millis(200));
let s9pk = S9pk::open(&path, None).await?;
let s9pk = S9pk::open(&path, None, false).await?;
let m = s9pk.as_manifest().clone();
pb.set_style(plain_line_style.clone());
pb.abandon();
@@ -145,7 +145,7 @@ pub async fn publish(
.with_prefix("[1/3]")
.with_message("Verifying s9pk");
pb.enable_steady_tick(Duration::from_millis(200));
let s9pk = S9pk::open(&path, None).await?;
let s9pk = S9pk::open(&path, None, false).await?;
// s9pk.validate().await?;
todo!();
let m = s9pk.as_manifest().clone();

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -60,6 +61,10 @@ fn inspect() -> ParentHandler<S9pkPath> {
.with_inherited(only_parent)
.with_display_serializable(),
)
.subcommand(
"cat",
from_fn_async(cat).with_inherited(only_parent).no_display(),
)
.subcommand(
"manifest",
from_fn_async(inspect_manifest)
@@ -72,109 +77,116 @@ fn inspect() -> ParentHandler<S9pkPath> {
struct AddImageParams {
id: ImageId,
image: String,
arches: Option<Vec<String>>,
}
async fn add_image(
ctx: CliContext,
AddImageParams { id, image }: AddImageParams,
AddImageParams { id, image, arches }: AddImageParams,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<(), Error> {
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?, false)
.await?
.into_dyn();
let arches: BTreeSet<_> = arches
.unwrap_or_else(|| vec!["x86_64".to_owned(), "aarch64".to_owned()])
.into_iter()
.collect();
let tmpdir = TmpDir::new().await?;
let sqfs_path = tmpdir.join("image.squashfs");
let arch = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg("--entrypoint")
.arg("uname")
.arg(&image)
.arg("-m")
.invoke(ErrorKind::Docker)
.await?,
)?;
let env = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg("--entrypoint")
.arg("env")
.arg(&image)
.invoke(ErrorKind::Docker)
.await?,
)?
.lines()
.filter(|l| {
l.trim()
.split_once("=")
.map_or(false, |(v, _)| !SKIP_ENV.contains(&v))
})
.join("\n")
+ "\n";
let workdir = Path::new(
String::from_utf8(
for arch in arches {
let sqfs_path = tmpdir.join(format!("image.{arch}.squashfs"));
let docker_platform = if arch == "x86_64" {
"--platform=linux/amd64".to_owned()
} else if arch == "aarch64" {
"--platform=linux/arm64".to_owned()
} else {
format!("--platform=linux/{arch}")
};
let env = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg(&docker_platform)
.arg("--entrypoint")
.arg("pwd")
.arg("env")
.arg(&image)
.invoke(ErrorKind::Docker)
.await?,
)?
.trim(),
)
.to_owned();
let container_id = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
.arg(&image)
.lines()
.filter(|l| {
l.trim()
.split_once("=")
.map_or(false, |(v, _)| !SKIP_ENV.contains(&v))
})
.join("\n")
+ "\n";
let workdir = Path::new(
String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg(&docker_platform)
.arg("--rm")
.arg("--entrypoint")
.arg("pwd")
.arg(&image)
.invoke(ErrorKind::Docker)
.await?,
)?
.trim(),
)
.to_owned();
let container_id = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
.arg(&docker_platform)
.arg(&image)
.invoke(ErrorKind::Docker)
.await?,
)?;
Command::new("bash")
.arg("-c")
.arg(format!(
"{CONTAINER_TOOL} export {container_id} | mksquashfs - {sqfs} -tar",
container_id = container_id.trim(),
sqfs = sqfs_path.display()
))
.invoke(ErrorKind::Docker)
.await?,
)?;
Command::new("bash")
.arg("-c")
.arg(format!(
"{CONTAINER_TOOL} export {container_id} | mksquashfs - {sqfs} -tar",
container_id = container_id.trim(),
sqfs = sqfs_path.display()
))
.invoke(ErrorKind::Docker)
.await?;
Command::new(CONTAINER_TOOL)
.arg("rm")
.arg(container_id.trim())
.invoke(ErrorKind::Docker)
.await?;
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?)
.await?
.into_dyn();
let archive = s9pk.as_archive_mut();
archive.set_signer(ctx.developer_key()?.clone());
archive.contents_mut().insert_path(
Path::new("images")
.join(arch.trim())
.join(&id)
.with_extension("squashfs"),
Entry::file(DynFileSource::new(sqfs_path)),
)?;
archive.contents_mut().insert_path(
Path::new("images")
.join(arch.trim())
.join(&id)
.with_extension("env"),
Entry::file(DynFileSource::new(Arc::from(Vec::from(env)))),
)?;
archive.contents_mut().insert_path(
Path::new("images")
.join(arch.trim())
.join(&id)
.with_extension("json"),
Entry::file(DynFileSource::new(Arc::from(
serde_json::to_vec(&serde_json::json!({
"workdir": workdir
}))
.with_kind(ErrorKind::Serialization)?,
))),
)?;
.await?;
Command::new(CONTAINER_TOOL)
.arg("rm")
.arg(container_id.trim())
.invoke(ErrorKind::Docker)
.await?;
let archive = s9pk.as_archive_mut();
archive.set_signer(ctx.developer_key()?.clone());
archive.contents_mut().insert_path(
Path::new("images")
.join(&arch)
.join(&id)
.with_extension("squashfs"),
Entry::file(DynFileSource::new(sqfs_path)),
)?;
archive.contents_mut().insert_path(
Path::new("images")
.join(&arch)
.join(&id)
.with_extension("env"),
Entry::file(DynFileSource::new(Arc::from(Vec::from(env)))),
)?;
archive.contents_mut().insert_path(
Path::new("images")
.join(&arch)
.join(&id)
.with_extension("json"),
Entry::file(DynFileSource::new(Arc::from(
serde_json::to_vec(&serde_json::json!({
"workdir": workdir
}))
.with_kind(ErrorKind::Serialization)?,
))),
)?;
}
s9pk.as_manifest_mut().images.insert(id);
let tmp_path = s9pk_path.with_extension("s9pk.tmp");
let mut tmp_file = File::create(&tmp_path).await?;
s9pk.serialize(&mut tmp_file, true).await?;
@@ -193,7 +205,7 @@ async fn edit_manifest(
EditManifestParams { expression }: EditManifestParams,
S9pkPath { s9pk: s9pk_path }: S9pkPath,
) -> Result<Manifest, Error> {
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?).await?;
let mut s9pk = S9pk::from_file(super::load(&ctx, &s9pk_path).await?, false).await?;
let old = serde_json::to_value(s9pk.as_manifest()).with_kind(ErrorKind::Serialization)?;
*s9pk.as_manifest_mut() = serde_json::from_value(apply_expr(old.into(), &expression)?.into())
.with_kind(ErrorKind::Serialization)?;
@@ -214,15 +226,45 @@ async fn file_tree(
_: Empty,
S9pkPath { s9pk }: S9pkPath,
) -> Result<Vec<PathBuf>, Error> {
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?).await?;
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?, false).await?;
Ok(s9pk.as_archive().contents().file_paths(""))
}
#[derive(Deserialize, Serialize, Parser, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
struct CatParams {
file_path: PathBuf,
}
async fn cat(
ctx: CliContext,
CatParams { file_path }: CatParams,
S9pkPath { s9pk }: S9pkPath,
) -> Result<(), Error> {
use crate::s9pk::merkle_archive::source::FileSource;
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?, false).await?;
tokio::io::copy(
&mut s9pk
.as_archive()
.contents()
.get_path(&file_path)
.or_not_found(&file_path.display())?
.as_file()
.or_not_found(&file_path.display())?
.reader()
.await?,
&mut tokio::io::stdout(),
)
.await?;
Ok(())
}
async fn inspect_manifest(
ctx: CliContext,
_: Empty,
S9pkPath { s9pk }: S9pkPath,
) -> Result<Manifest, Error> {
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?).await?;
let s9pk = S9pk::from_file(super::load(&ctx, &s9pk).await?, false).await?;
Ok(s9pk.as_manifest().clone())
}

View File

@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::io::SeekFrom;
use std::path::Path;
@@ -10,7 +9,7 @@ use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio_tar::{Archive, Entry};
use crate::util::io::from_cbor_async_reader;
use crate::{Error, ErrorKind, ARCH};
use crate::{Error, ErrorKind};
#[derive(Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
@@ -26,8 +25,8 @@ pub enum DockerReader<R: AsyncRead + Unpin> {
MultiArch(#[pin] Entry<Archive<R>>),
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> DockerReader<R> {
pub async fn new(mut rdr: R) -> Result<Self, Error> {
let arch = if let Some(multiarch) = tokio_tar::Archive::new(&mut rdr)
pub async fn list_arches(rdr: &mut R) -> Result<BTreeSet<String>, Error> {
if let Some(multiarch) = tokio_tar::Archive::new(rdr)
.entries()?
.try_filter_map(|e| {
async move {
@@ -43,41 +42,38 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> DockerReader<R> {
.await?
{
let multiarch: DockerMultiArch = from_cbor_async_reader(multiarch).await?;
Some(if multiarch.available.contains(&**ARCH) {
Cow::Borrowed(&**ARCH)
} else {
Cow::Owned(multiarch.default)
})
Ok(multiarch.available)
} else {
None
};
Err(Error::new(
eyre!("Single arch legacy s9pks not supported"),
ErrorKind::ParseS9pk,
))
}
}
pub async fn new(mut rdr: R, arch: &str) -> Result<Self, Error> {
rdr.seek(SeekFrom::Start(0)).await?;
if let Some(arch) = arch {
if let Some(image) = tokio_tar::Archive::new(rdr)
.entries()?
.try_filter_map(|e| {
let arch = arch.clone();
async move {
Ok(if &*e.path()? == Path::new(&format!("{}.tar", arch)) {
Some(e)
} else {
None
})
}
.boxed()
})
.try_next()
.await?
{
Ok(Self::MultiArch(image))
} else {
Err(Error::new(
eyre!("Docker image section does not contain tarball for architecture"),
ErrorKind::ParseS9pk,
))
}
if let Some(image) = tokio_tar::Archive::new(rdr)
.entries()?
.try_filter_map(|e| {
let arch = arch.clone();
async move {
Ok(if &*e.path()? == Path::new(&format!("{}.tar", arch)) {
Some(e)
} else {
None
})
}
.boxed()
})
.try_next()
.await?
{
Ok(Self::MultiArch(image))
} else {
Ok(Self::SingleArch(rdr))
Err(Error::new(
eyre!("Docker image section does not contain tarball for architecture"),
ErrorKind::ParseS9pk,
))
}
}
}

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::io::SeekFrom;
use std::ops::Range;
use std::path::Path;
@@ -158,8 +159,8 @@ impl S9pkReader {
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
#[instrument(skip_all)]
pub async fn image_tags(&mut self) -> Result<Vec<ImageTag>, Error> {
let mut tar = tokio_tar::Archive::new(self.docker_images().await?);
pub async fn image_tags(&mut self, arch: &str) -> Result<Vec<ImageTag>, Error> {
let mut tar = tokio_tar::Archive::new(self.docker_images(arch).await?);
let mut entries = tar.entries()?;
while let Some(mut entry) = entries.try_next().await? {
if &*entry.path()? != Path::new("manifest.json") {
@@ -280,8 +281,15 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
self.read_handle(self.toc.icon).await
}
pub async fn docker_images(&mut self) -> Result<DockerReader<ReadHandle<'_, R>>, Error> {
DockerReader::new(self.read_handle(self.toc.docker_images).await?).await
pub async fn docker_arches(&mut self) -> Result<BTreeSet<String>, Error> {
DockerReader::list_arches(&mut self.read_handle(self.toc.docker_images).await?).await
}
pub async fn docker_images(
&mut self,
arch: &str,
) -> Result<DockerReader<ReadHandle<'_, R>>, Error> {
DockerReader::new(self.read_handle(self.toc.docker_images).await?, arch).await
}
pub async fn assets(&mut self) -> Result<ReadHandle<'_, R>, Error> {

View File

@@ -21,7 +21,6 @@ use crate::s9pk::v1::reader::S9pkReader;
use crate::s9pk::v2::S9pk;
use crate::util::io::TmpDir;
use crate::util::Invoke;
use crate::ARCH;
pub const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x01];
@@ -96,155 +95,166 @@ impl S9pk<Section<MultiCursorFile>> {
)?;
// images
let images_dir = scratch_dir.join("images");
tokio::fs::create_dir_all(&images_dir).await?;
Command::new(CONTAINER_TOOL)
.arg("load")
.input(Some(&mut reader.docker_images().await?))
.invoke(ErrorKind::Docker)
.await?;
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct DockerImagesOut {
repository: Option<String>,
tag: Option<String>,
#[serde(default)]
names: Vec<String>,
}
for image in {
#[cfg(feature = "docker")]
let images = std::str::from_utf8(
&Command::new(CONTAINER_TOOL)
.arg("images")
.arg("--format=json")
.invoke(ErrorKind::Docker)
.await?,
)?
.lines()
.map(|l| serde_json::from_str::<DockerImagesOut>(l))
.collect::<Result<Vec<_>, _>>()
.with_kind(ErrorKind::Deserialization)?
.into_iter();
#[cfg(not(feature = "docker"))]
let images = serde_json::from_slice::<Vec<DockerImagesOut>>(
&Command::new(CONTAINER_TOOL)
.arg("images")
.arg("--format=json")
.invoke(ErrorKind::Docker)
.await?,
)
.with_kind(ErrorKind::Deserialization)?
.into_iter();
images
}
.flat_map(|i| {
if let (Some(repository), Some(tag)) = (i.repository, i.tag) {
vec![format!("{repository}:{tag}")]
for arch in reader.docker_arches().await? {
let images_dir = scratch_dir.join("images").join(&arch);
let docker_platform = if arch == "x86_64" {
"--platform=linux/amd64".to_owned()
} else if arch == "aarch64" {
"--platform=linux/arm64".to_owned()
} else {
i.names
.into_iter()
.filter_map(|i| i.strip_prefix("docker.io/").map(|s| s.to_owned()))
.collect()
format!("--platform=linux/{arch}")
};
tokio::fs::create_dir_all(&images_dir).await?;
Command::new(CONTAINER_TOOL)
.arg("load")
.input(Some(&mut reader.docker_images(&arch).await?))
.invoke(ErrorKind::Docker)
.await?;
#[derive(serde::Deserialize)]
#[serde(rename_all = "PascalCase")]
struct DockerImagesOut {
repository: Option<String>,
tag: Option<String>,
#[serde(default)]
names: Vec<String>,
}
})
.filter_map(|i| {
i.strip_suffix(&format!(":{}", manifest.version))
.map(|s| s.to_owned())
})
.filter_map(|i| {
i.strip_prefix(&format!("start9/{}/", manifest.id))
.map(|s| s.to_owned())
}) {
new_manifest.images.insert(image.parse()?);
let sqfs_path = images_dir.join(&image).with_extension("squashfs");
let image_name = format!("start9/{}/{}:{}", manifest.id, image, manifest.version);
let id = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?;
let env = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg("--entrypoint")
.arg("env")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?
.lines()
.filter(|l| {
l.trim()
.split_once("=")
.map_or(false, |(v, _)| !SKIP_ENV.contains(&v))
for image in {
#[cfg(feature = "docker")]
let images = std::str::from_utf8(
&Command::new(CONTAINER_TOOL)
.arg("images")
.arg("--format=json")
.invoke(ErrorKind::Docker)
.await?,
)?
.lines()
.map(|l| serde_json::from_str::<DockerImagesOut>(l))
.collect::<Result<Vec<_>, _>>()
.with_kind(ErrorKind::Deserialization)?
.into_iter();
#[cfg(not(feature = "docker"))]
let images = serde_json::from_slice::<Vec<DockerImagesOut>>(
&Command::new(CONTAINER_TOOL)
.arg("images")
.arg("--format=json")
.invoke(ErrorKind::Docker)
.await?,
)
.with_kind(ErrorKind::Deserialization)?
.into_iter();
images
}
.flat_map(|i| {
if let (Some(repository), Some(tag)) = (i.repository, i.tag) {
vec![format!("{repository}:{tag}")]
} else {
i.names
.into_iter()
.filter_map(|i| i.strip_prefix("docker.io/").map(|s| s.to_owned()))
.collect()
}
})
.join("\n")
+ "\n";
let workdir = Path::new(
String::from_utf8(
.filter_map(|i| {
i.strip_suffix(&format!(":{}", manifest.version))
.map(|s| s.to_owned())
})
.filter_map(|i| {
i.strip_prefix(&format!("start9/{}/", manifest.id))
.map(|s| s.to_owned())
}) {
new_manifest.images.insert(image.parse()?);
let sqfs_path = images_dir.join(&image).with_extension("squashfs");
let image_name = format!("start9/{}/{}:{}", manifest.id, image, manifest.version);
let id = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("create")
.arg(&docker_platform)
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?;
let env = String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg(&docker_platform)
.arg("--entrypoint")
.arg("pwd")
.arg("env")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?
.trim(),
)
.to_owned();
Command::new("bash")
.arg("-c")
.arg(format!(
"{CONTAINER_TOOL} export {id} | mksquashfs - {sqfs} -tar",
id = id.trim(),
sqfs = sqfs_path.display()
))
.invoke(ErrorKind::Docker)
.await?;
Command::new(CONTAINER_TOOL)
.arg("rm")
.arg(id.trim())
.invoke(ErrorKind::Docker)
.await?;
archive.insert_path(
Path::new("images")
.join(&*ARCH)
.join(&image)
.with_extension("squashfs"),
Entry::file(CompatSource::File(sqfs_path)),
)?;
archive.insert_path(
Path::new("images")
.join(&*ARCH)
.join(&image)
.with_extension("env"),
Entry::file(CompatSource::Buffered(Vec::from(env).into())),
)?;
archive.insert_path(
Path::new("images")
.join(&*ARCH)
.join(&image)
.with_extension("json"),
Entry::file(CompatSource::Buffered(
serde_json::to_vec(&serde_json::json!({
"workdir": workdir
}))
.with_kind(ErrorKind::Serialization)?
.into(),
)),
)?;
.lines()
.filter(|l| {
l.trim()
.split_once("=")
.map_or(false, |(v, _)| !SKIP_ENV.contains(&v))
})
.join("\n")
+ "\n";
let workdir = Path::new(
String::from_utf8(
Command::new(CONTAINER_TOOL)
.arg("run")
.arg("--rm")
.arg(&docker_platform)
.arg("--entrypoint")
.arg("pwd")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?,
)?
.trim(),
)
.to_owned();
Command::new("bash")
.arg("-c")
.arg(format!(
"{CONTAINER_TOOL} export {id} | mksquashfs - {sqfs} -tar",
id = id.trim(),
sqfs = sqfs_path.display()
))
.invoke(ErrorKind::Docker)
.await?;
Command::new(CONTAINER_TOOL)
.arg("rm")
.arg(id.trim())
.invoke(ErrorKind::Docker)
.await?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("squashfs"),
Entry::file(CompatSource::File(sqfs_path)),
)?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("env"),
Entry::file(CompatSource::Buffered(Vec::from(env).into())),
)?;
archive.insert_path(
Path::new("images")
.join(&arch)
.join(&image)
.with_extension("json"),
Entry::file(CompatSource::Buffered(
serde_json::to_vec(&serde_json::json!({
"workdir": workdir
}))
.with_kind(ErrorKind::Serialization)?
.into(),
)),
)?;
Command::new(CONTAINER_TOOL)
.arg("rmi")
.arg(&image_name)
.invoke(ErrorKind::Docker)
.await?;
}
}
Command::new(CONTAINER_TOOL)
.arg("image")
.arg("prune")
.arg("-af")
.invoke(ErrorKind::Docker)
.await?;
// assets
let asset_dir = scratch_dir.join("assets");
@@ -312,9 +322,10 @@ impl S9pk<Section<MultiCursorFile>> {
scratch_dir.delete().await?;
Ok(S9pk::deserialize(&MultiCursorFile::from(
File::open(destination.as_ref()).await?,
))
Ok(S9pk::deserialize(
&MultiCursorFile::from(File::open(destination.as_ref()).await?),
false,
)
.await?)
}
}

View File

@@ -173,7 +173,7 @@ impl<S: FileSource> S9pk<S> {
impl<S: ArchiveSource> S9pk<Section<S>> {
#[instrument(skip_all)]
pub async fn deserialize(source: &S) -> Result<Self, Error> {
pub async fn deserialize(source: &S, apply_filter: bool) -> Result<Self, Error> {
use tokio::io::AsyncReadExt;
let mut header = source
@@ -193,7 +193,9 @@ impl<S: ArchiveSource> S9pk<Section<S>> {
let mut archive = MerkleArchive::deserialize(source, &mut header).await?;
archive.filter(filter)?;
if apply_filter {
archive.filter(filter)?;
}
archive.sort_by(|a, b| match (priority(a), priority(b)) {
(Some(a), Some(b)) => a.cmp(&b),
@@ -206,11 +208,15 @@ impl<S: ArchiveSource> S9pk<Section<S>> {
}
}
impl S9pk {
pub async fn from_file(file: File) -> Result<Self, Error> {
Self::deserialize(&MultiCursorFile::from(file)).await
pub async fn from_file(file: File, apply_filter: bool) -> Result<Self, Error> {
Self::deserialize(&MultiCursorFile::from(file), apply_filter).await
}
pub async fn open(path: impl AsRef<Path>, id: Option<&PackageId>) -> Result<Self, Error> {
let res = Self::from_file(tokio::fs::File::open(path).await?).await?;
pub async fn open(
path: impl AsRef<Path>,
id: Option<&PackageId>,
apply_filter: bool,
) -> Result<Self, Error> {
let res = Self::from_file(tokio::fs::File::open(path).await?, apply_filter).await?;
if let Some(id) = id {
ensure_code!(
&res.as_manifest().id == id,

View File

@@ -13,12 +13,14 @@ use start_stop::StartStop;
use tokio::sync::Notify;
use ts_rs::TS;
use crate::context::{CliContext, RpcContext};
use crate::core::rpc_continuations::RequestGuid;
use crate::db::model::package::{
InstalledState, PackageDataEntry, PackageState, PackageStateMatchModelRef, UpdatingState,
};
use crate::disk::mount::guard::GenericMountGuard;
use crate::install::PKG_ARCHIVE_DIR;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::progress::{NamedProgress, Progress};
use crate::s9pk::S9pk;
@@ -31,10 +33,6 @@ use crate::util::actor::concurrent::ConcurrentActor;
use crate::util::actor::Actor;
use crate::util::serde::Pem;
use crate::volume::data_dir;
use crate::{
context::{CliContext, RpcContext},
lxc::ContainerId,
};
mod action;
pub mod cli;
@@ -138,7 +136,7 @@ impl Service {
match entry.as_state_info().as_match() {
PackageStateMatchModelRef::Installing(_) => {
if disposition == LoadDisposition::Retry {
if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id)).await.map_err(|e| {
if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id), true).await.map_err(|e| {
tracing::error!("Error opening s9pk for install: {e}");
tracing::debug!("{e:?}")
}) {
@@ -171,7 +169,7 @@ impl Service {
&& progress == &Progress::Complete(true)
})
{
if let Ok(s9pk) = S9pk::open(&s9pk_path, Some(id)).await.map_err(|e| {
if let Ok(s9pk) = S9pk::open(&s9pk_path, Some(id), true).await.map_err(|e| {
tracing::error!("Error opening s9pk for update: {e}");
tracing::debug!("{e:?}")
}) {
@@ -190,7 +188,7 @@ impl Service {
}
}
}
let s9pk = S9pk::open(s9pk_path, Some(id)).await?;
let s9pk = S9pk::open(s9pk_path, Some(id), true).await?;
ctx.db
.mutate({
|db| {
@@ -215,7 +213,7 @@ impl Service {
handle_installed(s9pk, entry).await
}
PackageStateMatchModelRef::Removing(_) | PackageStateMatchModelRef::Restoring(_) => {
if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id)).await.map_err(|e| {
if let Ok(s9pk) = S9pk::open(s9pk_path, Some(id), true).await.map_err(|e| {
tracing::error!("Error opening s9pk for removal: {e}");
tracing::debug!("{e:?}")
}) {
@@ -243,7 +241,7 @@ impl Service {
Ok(None)
}
PackageStateMatchModelRef::Installed(_) => {
handle_installed(S9pk::open(s9pk_path, Some(id)).await?, entry).await
handle_installed(S9pk::open(s9pk_path, Some(id), true).await?, entry).await
}
PackageStateMatchModelRef::Error(e) => Err(Error::new(
eyre!("Failed to parse PackageDataEntry, found {e:?}"),
@@ -349,6 +347,7 @@ impl Service {
}
Ok(())
}
pub async fn backup(&self, _guard: impl GenericMountGuard) -> Result<BackupReturn, Error> {
// TODO
Err(Error::new(eyre!("not yet implemented"), ErrorKind::Unknown))

View File

@@ -4,7 +4,7 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use futures::future::ready;
use futures::Future;
use futures::{Future, FutureExt};
use helpers::NonDetachingJoinHandle;
use imbl_value::InternedString;
use models::{ProcedureName, VolumeId};
@@ -92,6 +92,7 @@ pub struct PersistentContainer {
pub(super) overlays: Arc<Mutex<BTreeMap<InternedString, OverlayGuard>>>,
pub(super) state: Arc<watch::Sender<ServiceState>>,
pub(super) net_service: Mutex<NetService>,
destroyed: bool,
}
impl PersistentContainer {
@@ -217,6 +218,7 @@ impl PersistentContainer {
overlays: Arc::new(Mutex::new(BTreeMap::new())),
state: Arc::new(watch::channel(ServiceState::new(start)).0),
net_service: Mutex::new(net_service),
destroyed: false,
})
}
@@ -285,7 +287,10 @@ impl PersistentContainer {
}
#[instrument(skip_all)]
fn destroy(&mut self) -> impl Future<Output = Result<(), Error>> + 'static {
fn destroy(&mut self) -> Option<impl Future<Output = Result<(), Error>> + 'static> {
if self.destroyed {
return None;
}
let rpc_client = self.rpc_client.clone();
let rpc_server = self.rpc_server.send_replace(None);
let js_mount = self.js_mount.take();
@@ -293,33 +298,45 @@ impl PersistentContainer {
let assets = std::mem::take(&mut self.assets);
let overlays = self.overlays.clone();
let lxc_container = self.lxc_container.take();
async move {
let mut errs = ErrorCollection::new();
if let Some((hdl, shutdown)) = rpc_server {
errs.handle(rpc_client.request(rpc::Exit, Empty {}).await);
shutdown.shutdown();
errs.handle(hdl.await.with_kind(ErrorKind::Cancelled));
self.destroyed = true;
Some(
async move {
dbg!(
async move {
let mut errs = ErrorCollection::new();
if let Some((hdl, shutdown)) = rpc_server {
errs.handle(rpc_client.request(rpc::Exit, Empty {}).await);
shutdown.shutdown();
errs.handle(hdl.await.with_kind(ErrorKind::Cancelled));
}
for (_, volume) in volumes {
errs.handle(volume.unmount(true).await);
}
for (_, assets) in assets {
errs.handle(assets.unmount(true).await);
}
for (_, overlay) in std::mem::take(&mut *overlays.lock().await) {
errs.handle(overlay.unmount(true).await);
}
errs.handle(js_mount.unmount(true).await);
if let Some(lxc_container) = lxc_container {
errs.handle(lxc_container.exit().await);
}
dbg!(errs.into_result())
}
.await
)
}
for (_, volume) in volumes {
errs.handle(volume.unmount(true).await);
}
for (_, assets) in assets {
errs.handle(assets.unmount(true).await);
}
for (_, overlay) in std::mem::take(&mut *overlays.lock().await) {
errs.handle(overlay.unmount(true).await);
}
errs.handle(js_mount.unmount(true).await);
if let Some(lxc_container) = lxc_container {
errs.handle(lxc_container.exit().await);
}
errs.into_result()
}
.map(|a| dbg!(a)),
)
}
#[instrument(skip_all)]
pub async fn exit(mut self) -> Result<(), Error> {
self.destroy().await?;
if let Some(destroy) = self.destroy() {
dbg!(destroy.await)?;
}
tracing::info!("Service for {} exited", self.s9pk.as_manifest().id);
Ok(())
}
@@ -416,7 +433,8 @@ impl PersistentContainer {
impl Drop for PersistentContainer {
fn drop(&mut self) {
let destroy = self.destroy();
tokio::spawn(async move { destroy.await.unwrap() });
if let Some(destroy) = self.destroy() {
tokio::spawn(async move { destroy.await.unwrap() });
}
}
}

View File

@@ -1165,6 +1165,7 @@ async fn set_dependencies(
.join(&format!("package/v2/{}.s9pk?spec={}", dep_id, version_spec))?,
)
.await?,
true,
)
.await?;

View File

@@ -42,10 +42,9 @@ pub struct InstallProgressHandles {
pub struct ServiceMap(Mutex<OrdMap<PackageId, Arc<RwLock<Option<Service>>>>>);
impl ServiceMap {
async fn entry(&self, id: &PackageId) -> Arc<RwLock<Option<Service>>> {
self.0
.lock()
.await
.entry(id.clone())
let mut lock = self.0.lock().await;
dbg!(lock.keys().collect::<Vec<_>>());
lock.entry(id.clone())
.or_insert_with(|| Arc::new(RwLock::new(None)))
.clone()
}
@@ -230,7 +229,7 @@ impl ServiceMap {
.await?;
Ok(reload_guard
.handle_last(async move {
let s9pk = S9pk::open(&installed_path, Some(&id)).await?;
let s9pk = S9pk::open(&installed_path, Some(&id), true).await?;
let prev = if let Some(service) = service.take() {
ensure_code!(
recovery_source.is_none(),
@@ -293,9 +292,14 @@ impl ServiceMap {
/// This is ran during the cleanup, so when we are uninstalling the service
#[instrument(skip_all)]
pub async fn uninstall(&self, ctx: &RpcContext, id: &PackageId) -> Result<(), Error> {
if let Some(service) = self.get_mut(id).await.take() {
let mut guard = self.get_mut(id).await;
if let Some(service) = guard.take() {
ServiceReloadGuard::new(ctx.clone(), id.clone(), "Uninstall")
.handle_last(service.uninstall(None))
.handle_last(async move {
let res = service.uninstall(None).await;
drop(guard);
res
})
.await?;
}
Ok(())

View File

@@ -96,7 +96,7 @@ impl<A: Actor + Clone> Future for ConcurrentRunner<A> {
cont
} {}
let _ = this.bg_runner.as_mut().poll(cx);
if this.waiting.is_empty() && this.handlers.is_empty() && this.bg_runner.is_empty() {
if this.waiting.is_empty() && this.handlers.is_empty() && this.recv.is_closed() {
std::task::Poll::Ready(())
} else {
std::task::Poll::Pending

View File

@@ -498,7 +498,7 @@ impl<'a, T> From<&'a T> for MaybeOwned<'a, T> {
pub fn new_guid() -> InternedString {
use rand::RngCore;
let mut buf = [0; 40];
let mut buf = [0; 20];
rand::thread_rng().fill_bytes(&mut buf);
InternedString::intern(base32::encode(
base32::Alphabet::RFC4648 { padding: false },