Feat/js long running (#1879)

* wip: combining the streams

* chore: Testing locally

* chore: Fix some lint

* Feat/long running (#1676)

* feat: Start the long running container

* feat: Long running docker, running, stoping, and uninstalling

* feat: Just make the folders that we would like to mount.

* fix: Uninstall not working

* chore: remove some logging

* feat: Smarter cleanup

* feat: Wait for start

* wip: Need to kill

* chore: Remove the bad tracing

* feat: Stopping the long running processes without killing the long
running

* Mino Feat: Change the Manifest To have a new type (#1736)

* Add build-essential to README.md (#1716)

Update README.md

* write image to sparse-aware archive format (#1709)

* fix: Add modification to the max_user_watches (#1695)

* fix: Add modification to the max_user_watches

* chore: Move to initialization

* [Feat] follow logs (#1714)

* tail logs

* add cli

* add FE

* abstract http to shared

* batch new logs

* file download for logs

* fix modal error when no config

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: BluJ <mogulslayer@gmail.com>

* Update README.md (#1728)

* fix build for patch-db client for consistency (#1722)

* fix cli install (#1720)

* highlight instructions if not viewed (#1731)

* wip:

* [ ] Fix the build (dependencies:634 map for option)

* fix: Cargo build

* fix: Long running wasn't starting

* fix: uninstall works

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* chore: Fix a dbg!

* chore: Make the commands of the docker-inject do inject instead of exec

* chore: Fix compile mistake

* chore: Change to use simpler

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* wip: making the mananger create

* wip: Working on trying to make the long running docker container command

* Feat/long running (#1676)

* feat: Start the long running container

* feat: Long running docker, running, stoping, and uninstalling

* feat: Just make the folders that we would like to mount.

* fix: Uninstall not working

* chore: remove some logging

* feat: Smarter cleanup

* feat: Wait for start

* wip: Need to kill

* chore: Remove the bad tracing

* feat: Stopping the long running processes without killing the long
running

* Mino Feat: Change the Manifest To have a new type (#1736)

* Add build-essential to README.md (#1716)

Update README.md

* write image to sparse-aware archive format (#1709)

* fix: Add modification to the max_user_watches (#1695)

* fix: Add modification to the max_user_watches

* chore: Move to initialization

* [Feat] follow logs (#1714)

* tail logs

* add cli

* add FE

* abstract http to shared

* batch new logs

* file download for logs

* fix modal error when no config

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: BluJ <mogulslayer@gmail.com>

* Update README.md (#1728)

* fix build for patch-db client for consistency (#1722)

* fix cli install (#1720)

* highlight instructions if not viewed (#1731)

* wip:

* [ ] Fix the build (dependencies:634 map for option)

* fix: Cargo build

* fix: Long running wasn't starting

* fix: uninstall works

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* chore: Fix a dbg!

* chore: Make the commands of the docker-inject do inject instead of exec

* chore: Fix compile mistake

* chore: Change to use simpler

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* feat: Use the long running feature in the manager

* remove recovered services and drop reordering feature (#1829)

* wip: Need to get the initial docker command running?

* chore: Add in the new procedure for the docker.

* feat: Get the system to finally run long

* wip: Added the command inserter to the docker persistance

* wip: Added the command inserter to the docker persistance

* Feat/long running (#1676)

* feat: Start the long running container

* feat: Long running docker, running, stoping, and uninstalling

* feat: Just make the folders that we would like to mount.

* fix: Uninstall not working

* chore: remove some logging

* feat: Smarter cleanup

* feat: Wait for start

* wip: Need to kill

* chore: Remove the bad tracing

* feat: Stopping the long running processes without killing the long
running

* Mino Feat: Change the Manifest To have a new type (#1736)

* Add build-essential to README.md (#1716)

Update README.md

* write image to sparse-aware archive format (#1709)

* fix: Add modification to the max_user_watches (#1695)

* fix: Add modification to the max_user_watches

* chore: Move to initialization

* [Feat] follow logs (#1714)

* tail logs

* add cli

* add FE

* abstract http to shared

* batch new logs

* file download for logs

* fix modal error when no config

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: BluJ <mogulslayer@gmail.com>

* Update README.md (#1728)

* fix build for patch-db client for consistency (#1722)

* fix cli install (#1720)

* highlight instructions if not viewed (#1731)

* wip:

* [ ] Fix the build (dependencies:634 map for option)

* fix: Cargo build

* fix: Long running wasn't starting

* fix: uninstall works

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* chore: Fix a dbg!

* chore: Make the commands of the docker-inject do inject instead of exec

* chore: Fix compile mistake

* chore: Change to use simpler

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* remove recovered services and drop reordering feature (#1829)

* chore: Convert the migration to use receipt. (#1842)

* feat: remove ionic storage (#1839)

* feat: remove ionic storage

* grayscal when disconncted, rename local storage service for clarity

* remove storage from package lock

* update patchDB

Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>

* update patchDB

* feat: Move the run_command into the js

* Feat/long running (#1676)

* feat: Start the long running container

* feat: Long running docker, running, stoping, and uninstalling

* feat: Just make the folders that we would like to mount.

* fix: Uninstall not working

* chore: remove some logging

* feat: Smarter cleanup

* feat: Wait for start

* wip: Need to kill

* chore: Remove the bad tracing

* feat: Stopping the long running processes without killing the long
running

* Mino Feat: Change the Manifest To have a new type (#1736)

* Add build-essential to README.md (#1716)

Update README.md

* write image to sparse-aware archive format (#1709)

* fix: Add modification to the max_user_watches (#1695)

* fix: Add modification to the max_user_watches

* chore: Move to initialization

* [Feat] follow logs (#1714)

* tail logs

* add cli

* add FE

* abstract http to shared

* batch new logs

* file download for logs

* fix modal error when no config

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: BluJ <mogulslayer@gmail.com>

* Update README.md (#1728)

* fix build for patch-db client for consistency (#1722)

* fix cli install (#1720)

* highlight instructions if not viewed (#1731)

* wip:

* [ ] Fix the build (dependencies:634 map for option)

* fix: Cargo build

* fix: Long running wasn't starting

* fix: uninstall works

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* chore: Fix a dbg!

* chore: Make the commands of the docker-inject do inject instead of exec

* chore: Fix compile mistake

* chore: Change to use simpler

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>

* remove recovered services and drop reordering feature (#1829)

* chore: Convert the migration to use receipt. (#1842)

* feat: remove ionic storage (#1839)

* feat: remove ionic storage

* grayscal when disconncted, rename local storage service for clarity

* remove storage from package lock

* update patchDB

Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>

* update patch DB

* chore: Change the error catching for the long running to try all

* Feat/community marketplace (#1790)

* add community marketplace

* Update embassy-mock-api.service.ts

* expect ui/marketplace to be undefined

* possible undefined from getpackage

* fix marketplace pages

* rework marketplace infrastructure

* fix bugs

Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>

* WIP: Fix the build, needed to move around creation of exec

* wip: Working on solving why there is a missing end.

* fix: make `shared` module independent of `config.js` (#1870)

* feat: Add in the kill and timeout

* feat: Get the run to actually work.

* chore: Add when/ why/ where comments

* feat: Convert inject main to use exec main.

* Fix: Ability to stop services

* wip: long running js main

* feat: Kill for the main

* Fix

* fix: Fix the build for x86

* wip: Working on changes

* wip: Working on trying to kill js

* fix: Testing for slow

* feat: Test that the new manifest works

* chore: Try and fix build?

* chore: Fix? the build

* chore: Fix the long input dies and never restarts

* build improvements

* no workdir

* fix: Architecture for long running

* chore: Fix and remove the docker inject

* chore: Undo the changes to the kiosk mode

* fix: Remove the it from the prod build

* fix: Start issue

* fix: The compat build

* chore: Add in the conditional compilation again for the missing impl

* chore: Change to aux

* chore: Remove the aux for now

* chore: Add some documentation to docker container

Co-authored-by: Chris Guida <chrisguida@users.noreply.github.com>
Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com>
Co-authored-by: Aiden McClelland <me@drbonez.dev>
Co-authored-by: Matt Hill <matthewonthemoon@gmail.com>
Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com>
Co-authored-by: Matt Hill <MattDHill@users.noreply.github.com>
Co-authored-by: Alex Inkin <alexander@inkin.ru>
This commit is contained in:
J M
2022-10-25 17:18:49 -06:00
committed by Aiden McClelland
parent 26d2152a36
commit 2642ec85e5
46 changed files with 2466 additions and 1050 deletions

3
.gitignore vendored
View File

@@ -1,5 +1,8 @@
.DS_Store
.idea
system-images/binfmt/binfmt.tar
system-images/compat/compat.tar
system-images/util/util.tar
/*.img
/*.img.gz
/*.img.xz

View File

@@ -1,7 +1,7 @@
ARCH = $(shell uname -m)
ENVIRONMENT_FILE = $(shell ./check-environment.sh)
GIT_HASH_FILE = $(shell ./check-git-hash.sh)
EMBASSY_BINS := backend/target/$(ARCH)-unknown-linux-gnu/release/embassyd backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-init backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-cli backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-sdk backend/target/$(ARCH)-unknown-linux-gnu/release/avahi-alias
EMBASSY_BINS := backend/target/$(ARCH)-unknown-linux-gnu/release/embassyd backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-init backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-cli backend/target/$(ARCH)-unknown-linux-gnu/release/embassy-sdk backend/target/$(ARCH)-unknown-linux-gnu/release/avahi-alias libs/target/aarch64-unknown-linux-musl/release/embassy_container_init libs/target/x86_64-unknown-linux-musl/release/embassy_container_init
EMBASSY_UIS := frontend/dist/ui frontend/dist/setup-wizard frontend/dist/diagnostic-ui
EMBASSY_SRC := backend/embassyd.service backend/embassy-init.service $(EMBASSY_UIS) $(shell find build)
COMPAT_SRC := $(shell find system-images/compat/ -not -path 'system-images/compat/target/*' -and -not -name *.tar -and -not -name target)
@@ -75,6 +75,10 @@ install: all
cp ENVIRONMENT.txt $(DESTDIR)/usr/lib/embassy/
cp GIT_HASH.txt $(DESTDIR)/usr/lib/embassy/
mkdir -p $(DESTDIR)/usr/lib/embassy/container
cp libs/target/aarch64-unknown-linux-musl/release/embassy_container_init $(DESTDIR)/usr/lib/embassy/container/embassy_container_init.arm64
cp libs/target/x86_64-unknown-linux-musl/release/embassy_container_init $(DESTDIR)/usr/lib/embassy/container/embassy_container_init.amd64
mkdir -p $(DESTDIR)/usr/lib/embassy/system-images
cp system-images/compat/docker-images/aarch64.tar $(DESTDIR)/usr/lib/embassy/system-images/compat.tar
cp system-images/utils/docker-images/$(ARCH).tar $(DESTDIR)/usr/lib/embassy/system-images/utils.tar

26
backend/Cargo.lock generated
View File

@@ -1142,6 +1142,7 @@ dependencies = [
"divrem",
"ed25519",
"ed25519-dalek",
"embassy_container_init",
"emver",
"fd-lock-rs",
"futures",
@@ -1213,6 +1214,23 @@ dependencies = [
"uuid",
]
[[package]]
name = "embassy_container_init"
version = "0.1.0"
dependencies = [
"async-stream",
"color-eyre",
"futures",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tracing",
"tracing-error 0.2.0",
"tracing-futures",
"tracing-subscriber 0.3.15",
]
[[package]]
name = "emver"
version = "0.1.6"
@@ -2020,10 +2038,12 @@ dependencies = [
name = "js_engine"
version = "0.1.0"
dependencies = [
"async-trait",
"dashmap",
"deno_ast",
"deno_core",
"dprint-swc-ext",
"embassy_container_init",
"helpers",
"models",
"reqwest",
@@ -2344,11 +2364,13 @@ dependencies = [
name = "models"
version = "0.1.0"
dependencies = [
"embassy_container_init",
"emver",
"patch-db",
"rand 0.8.5",
"serde",
"thiserror",
"tokio",
]
[[package]]
@@ -4645,9 +4667,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.9"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",

View File

@@ -75,6 +75,7 @@ fd-lock-rs = "0.1.4"
futures = "0.3.21"
git-version = "0.3.5"
helpers = { path = "../libs/helpers" }
embassy_container_init = { path = "../libs/embassy-container-init" }
hex = "0.4.3"
hmac = "0.12.1"
http = "0.2.8"

View File

@@ -17,7 +17,8 @@ if tty -s; then
USE_TTY="-it"
fi
alias 'rust-arm64-builder'='docker run $USE_TTY --rm -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$(pwd)":/home/rust/src -P start9/rust-arm-cross:aarch64'
alias 'rust-gnu-builder'='docker run $USE_TTY --rm -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$(pwd)":/home/rust/src -P start9/rust-arm-cross:aarch64'
alias 'rust-musl-builder'='docker run $USE_TTY --rm -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$(pwd)":/home/rust/src -P messense/rust-musl-cross:$ARCH-musl'
cd ..
FLAGS=""
@@ -27,15 +28,44 @@ fi
if [[ "$ENVIRONMENT" =~ (^|-)dev($|-) ]]; then
FLAGS="dev,$FLAGS"
fi
set +e
fail=
if [[ "$FLAGS" = "" ]]; then
rust-arm64-builder sh -c "(git config --global --add safe.directory '*'; cd backend && cargo build --release --locked --target=$ARCH-unknown-linux-gnu)"
rust-gnu-builder sh -c "(git config --global --add safe.directory '*'; cd backend && cargo build --release --locked --target=$ARCH-unknown-linux-gnu)"
if test $? -ne 0; then
fail=true
fi
for ARCH in x86_64 aarch64
do
rust-musl-builder sh -c "(git config --global --add safe.directory '*'; cd libs && cargo build --release --locked --bin embassy_container_init )"
if test $? -ne 0; then
fail=true
fi
done
else
echo "FLAGS=$FLAGS"
rust-arm64-builder sh -c "(git config --global --add safe.directory '*'; cd backend && cargo build --release --features $FLAGS --locked --target=$ARCH-unknown-linux-gnu)"
rust-gnu-builder sh -c "(git config --global --add safe.directory '*'; cd backend && cargo build --release --features $FLAGS --locked --target=$ARCH-unknown-linux-gnu)"
if test $? -ne 0; then
fail=true
fi
for ARCH in x86_64 aarch64
do
rust-musl-builder sh -c "(git config --global --add safe.directory '*'; cd libs && cargo build --release --features $FLAGS --locked --bin embassy_container_init)"
if test $? -ne 0; then
fail=true
fi
done
fi
set -e
cd backend
sudo chown -R $USER target
sudo chown -R $USER ~/.cargo
sudo chown -R $USER ../libs/target
if [-n fail]; then
exit 1
fi
#rust-arm64-builder aarch64-linux-gnu-strip target/aarch64-unknown-linux-gnu/release/embassyd

View File

@@ -11,7 +11,7 @@ use tracing::instrument;
use crate::config::{Config, ConfigSpec};
use crate::context::RpcContext;
use crate::id::ImageId;
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{PackageProcedure, ProcedureName};
use crate::s9pk::manifest::PackageId;
use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFormat};
@@ -59,7 +59,7 @@ impl Action {
#[instrument]
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -78,7 +78,6 @@ impl Action {
pub async fn execute(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
pkg_id: &PackageId,
pkg_version: &Version,
action_id: &ActionId,
@@ -93,7 +92,6 @@ impl Action {
self.implementation
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::Action(action_id.clone()),
@@ -145,23 +143,10 @@ pub async fn action(
.await?
.to_owned();
let container = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&pkg_id)
.and_then(|p| p.installed())
.expect(&mut db)
.await
.with_kind(crate::ErrorKind::NotFound)?
.manifest()
.container()
.get(&mut db, false)
.await?
.to_owned();
if let Some(action) = manifest.actions.0.get(&action_id) {
action
.execute(
&ctx,
&container,
&manifest.id,
&manifest.version,
&action_id,

View File

@@ -19,7 +19,7 @@ use crate::dependencies::reconfigure_dependents_with_live_pointers;
use crate::id::ImageId;
use crate::install::PKG_ARCHIVE_DIR;
use crate::net::interface::{InterfaceId, Interfaces};
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
use crate::s9pk::manifest::PackageId;
use crate::util::serde::IoFormat;
@@ -74,7 +74,7 @@ pub struct BackupActions {
impl BackupActions {
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -102,25 +102,12 @@ impl BackupActions {
let mut volumes = volumes.to_readonly();
volumes.insert(VolumeId::Backup, Volume::Backup { readonly: false });
let backup_dir = backup_dir(pkg_id);
let container = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&pkg_id)
.and_then(|p| p.installed())
.expect(db)
.await
.with_kind(crate::ErrorKind::NotFound)?
.manifest()
.container()
.get(db, false)
.await?
.to_owned();
if tokio::fs::metadata(&backup_dir).await.is_err() {
tokio::fs::create_dir_all(&backup_dir).await?
}
self.create
.execute::<(), NoOutput>(
ctx,
&container,
pkg_id,
pkg_version,
ProcedureName::CreateBackup,
@@ -200,7 +187,6 @@ impl BackupActions {
#[instrument(skip(ctx, db, secrets))]
pub async fn restore<Ex, Db: DbHandle>(
&self,
container: &Option<DockerContainer>,
ctx: &RpcContext,
db: &mut Db,
secrets: &mut Ex,
@@ -217,7 +203,6 @@ impl BackupActions {
self.restore
.execute::<(), NoOutput>(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::RestoreBackup,

View File

@@ -10,7 +10,7 @@ use super::{Config, ConfigSpec};
use crate::context::RpcContext;
use crate::dependencies::Dependencies;
use crate::id::ImageId;
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{PackageProcedure, ProcedureName};
use crate::s9pk::manifest::PackageId;
use crate::status::health_check::HealthCheckId;
@@ -34,7 +34,7 @@ impl ConfigActions {
#[instrument]
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -51,7 +51,6 @@ impl ConfigActions {
pub async fn get(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
pkg_id: &PackageId,
pkg_version: &Version,
volumes: &Volumes,
@@ -59,7 +58,6 @@ impl ConfigActions {
self.get
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::GetConfig,
@@ -77,7 +75,6 @@ impl ConfigActions {
pub async fn set(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
pkg_id: &PackageId,
pkg_version: &Version,
dependencies: &Dependencies,
@@ -88,7 +85,6 @@ impl ConfigActions {
.set
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::SetConfig,

View File

@@ -21,7 +21,7 @@ use crate::dependencies::{
DependencyErrors, DependencyReceipt, TaggedDependencyError, TryHealReceipts,
};
use crate::install::cleanup::{remove_from_current_dependents_lists, UpdateDependencyReceipts};
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::util::display_none;
use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFormat};
@@ -168,7 +168,6 @@ pub struct ConfigGetReceipts {
manifest_volumes: LockReceipt<crate::volume::Volumes, ()>,
manifest_version: LockReceipt<crate::util::Version, ()>,
manifest_config: LockReceipt<Option<ConfigActions>, ()>,
docker_container: LockReceipt<DockerContainer, String>,
}
impl ConfigGetReceipts {
@@ -204,19 +203,11 @@ impl ConfigGetReceipts {
.map(|x| x.manifest().config())
.make_locker(LockType::Write)
.add_to_keys(locks);
let docker_container = crate::db::DatabaseModel::new()
.package_data()
.star()
.installed()
.and_then(|x| x.manifest().container())
.make_locker(LockType::Write)
.add_to_keys(locks);
move |skeleton_key| {
Ok(Self {
manifest_volumes: manifest_volumes.verify(skeleton_key)?,
manifest_version: manifest_version.verify(skeleton_key)?,
manifest_config: manifest_config.verify(skeleton_key)?,
docker_container: docker_container.verify(skeleton_key)?,
})
}
}
@@ -239,11 +230,9 @@ pub async fn get(
.await?
.ok_or_else(|| Error::new(eyre!("{} has no config", id), crate::ErrorKind::NotFound))?;
let container = receipts.docker_container.get(&mut db, &id).await?;
let volumes = receipts.manifest_volumes.get(&mut db).await?;
let version = receipts.manifest_version.get(&mut db).await?;
action.get(&ctx, &container, &id, &version, &volumes).await
action.get(&ctx, &id, &version, &volumes).await
}
#[command(
@@ -286,7 +275,7 @@ pub struct ConfigReceipts {
pub current_dependencies: LockReceipt<CurrentDependencies, String>,
dependency_errors: LockReceipt<DependencyErrors, String>,
manifest_dependencies_config: LockReceipt<DependencyConfig, (String, String)>,
docker_container: LockReceipt<DockerContainer, String>,
docker_containers: LockReceipt<DockerContainers, String>,
}
impl ConfigReceipts {
@@ -391,11 +380,11 @@ impl ConfigReceipts {
.and_then(|x| x.manifest().dependencies().star().config())
.make_locker(LockType::Write)
.add_to_keys(locks);
let docker_container = crate::db::DatabaseModel::new()
let docker_containers = crate::db::DatabaseModel::new()
.package_data()
.star()
.installed()
.and_then(|x| x.manifest().container())
.and_then(|x| x.manifest().containers())
.make_locker(LockType::Write)
.add_to_keys(locks);
@@ -417,7 +406,7 @@ impl ConfigReceipts {
current_dependencies: current_dependencies.verify(skeleton_key)?,
dependency_errors: dependency_errors.verify(skeleton_key)?,
manifest_dependencies_config: manifest_dependencies_config.verify(skeleton_key)?,
docker_container: docker_container.verify(skeleton_key)?,
docker_containers: docker_containers.verify(skeleton_key)?,
})
}
}
@@ -509,8 +498,6 @@ pub fn configure_rec<'a, Db: DbHandle>(
receipts: &'a ConfigReceipts,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
let container = receipts.docker_container.get(db, &id).await?;
let container = &container;
// fetch data from db
let action = receipts
.config_actions
@@ -534,7 +521,7 @@ pub fn configure_rec<'a, Db: DbHandle>(
let ConfigRes {
config: old_config,
spec,
} = action.get(ctx, container, id, &version, &volumes).await?;
} = action.get(ctx, id, &version, &volumes).await?;
// determine new config to use
let mut config = if let Some(config) = config.or_else(|| old_config.clone()) {
@@ -602,15 +589,7 @@ pub fn configure_rec<'a, Db: DbHandle>(
let signal = if !dry_run {
// run config action
let res = action
.set(
ctx,
container,
id,
&version,
&dependencies,
&volumes,
&config,
)
.set(ctx, id, &version, &dependencies, &volumes, &config)
.await?;
// track dependencies with no pointers
@@ -702,7 +681,7 @@ pub fn configure_rec<'a, Db: DbHandle>(
.unwrap_or_default();
let next = Value::Object(config.clone());
for (dependent, dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) {
let dependent_container = receipts.docker_container.get(db, &dependent).await?;
let dependent_container = receipts.docker_containers.get(db, &dependent).await?;
let dependent_container = &dependent_container;
// check if config passes dependent check
if let Some(cfg) = receipts

View File

@@ -25,7 +25,6 @@ use super::{Config, MatchError, NoMatchWithPath, TimeoutError, TypeOf};
use crate::config::ConfigurationError;
use crate::context::RpcContext;
use crate::net::interface::InterfaceId;
use crate::procedure::docker::DockerContainer;
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::Error;
@@ -1883,7 +1882,6 @@ pub struct ConfigPointerReceipts {
manifest_volumes: LockReceipt<crate::volume::Volumes, String>,
manifest_version: LockReceipt<crate::util::Version, String>,
config_actions: LockReceipt<super::action::ConfigActions, String>,
docker_container: LockReceipt<DockerContainer, String>,
}
impl ConfigPointerReceipts {
@@ -1920,20 +1918,12 @@ impl ConfigPointerReceipts {
.and_then(|x| x.manifest().config())
.make_locker(LockType::Read)
.add_to_keys(locks);
let docker_container = crate::db::DatabaseModel::new()
.package_data()
.star()
.installed()
.and_then(|x| x.manifest().container())
.make_locker(LockType::Write)
.add_to_keys(locks);
move |skeleton_key| {
Ok(Self {
interface_addresses_receipt: interface_addresses_receipt(skeleton_key)?,
manifest_volumes: manifest_volumes.verify(skeleton_key)?,
config_actions: config_actions.verify(skeleton_key)?,
manifest_version: manifest_version.verify(skeleton_key)?,
docker_container: docker_container.verify(skeleton_key)?,
})
}
}
@@ -1963,12 +1953,11 @@ impl ConfigPointer {
let version = receipts.manifest_version.get(db, id).await.ok().flatten();
let cfg_actions = receipts.config_actions.get(db, id).await.ok().flatten();
let volumes = receipts.manifest_volumes.get(db, id).await.ok().flatten();
let container = receipts.docker_container.get(db, id).await.ok().flatten();
if let (Some(version), Some(cfg_actions), Some(volumes)) =
(&version, &cfg_actions, &volumes)
{
let cfg_res = cfg_actions
.get(ctx, &container, &self.package_id, version, volumes)
.get(ctx, &self.package_id, version, volumes)
.await
.map_err(|e| ConfigurationError::SystemError(e))?;
if let Some(cfg) = cfg_res.config {

View File

@@ -19,7 +19,7 @@ use crate::config::spec::PackagePointerSpec;
use crate::config::{not_found, Config, ConfigReceipts, ConfigSpec};
use crate::context::RpcContext;
use crate::db::model::{CurrentDependencies, CurrentDependents, InstalledPackageDataEntry};
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::status::health_check::{HealthCheckId, HealthCheckResult};
@@ -64,7 +64,7 @@ pub struct TryHealReceipts {
manifest_version: LockReceipt<Version, String>,
current_dependencies: LockReceipt<CurrentDependencies, String>,
dependency_errors: LockReceipt<DependencyErrors, String>,
docker_container: LockReceipt<DockerContainer, String>,
docker_containers: LockReceipt<DockerContainers, String>,
}
impl TryHealReceipts {
@@ -112,11 +112,11 @@ impl TryHealReceipts {
.map(|x| x.status().dependency_errors())
.make_locker(LockType::Write)
.add_to_keys(locks);
let docker_container = crate::db::DatabaseModel::new()
let docker_containers = crate::db::DatabaseModel::new()
.package_data()
.star()
.installed()
.and_then(|x| x.manifest().container())
.and_then(|x| x.manifest().containers())
.make_locker(LockType::Write)
.add_to_keys(locks);
move |skeleton_key| {
@@ -126,7 +126,7 @@ impl TryHealReceipts {
current_dependencies: current_dependencies.verify(skeleton_key)?,
manifest: manifest.verify(skeleton_key)?,
dependency_errors: dependency_errors.verify(skeleton_key)?,
docker_container: docker_container.verify(skeleton_key)?,
docker_containers: docker_containers.verify(skeleton_key)?,
})
}
}
@@ -203,7 +203,7 @@ impl DependencyError {
receipts: &'a TryHealReceipts,
) -> BoxFuture<'a, Result<Option<Self>, Error>> {
async move {
let container = receipts.docker_container.get(db, id).await?;
let container = receipts.docker_containers.get(db, id).await?;
Ok(match self {
DependencyError::NotInstalled => {
if receipts.status.get(db, dependency).await?.is_some() {
@@ -251,7 +251,6 @@ impl DependencyError {
cfg_info
.get(
ctx,
&container,
dependency,
&dependency_manifest.version,
&dependency_manifest.volumes,
@@ -507,7 +506,7 @@ impl DependencyConfig {
pub async fn check(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
dependent_id: &PackageId,
dependent_version: &Version,
dependent_volumes: &Volumes,
@@ -532,7 +531,7 @@ impl DependencyConfig {
pub async fn auto_configure(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
dependent_id: &PackageId,
dependent_version: &Version,
dependent_volumes: &Volumes,
@@ -562,7 +561,7 @@ pub struct DependencyConfigReceipts {
dependency_config_action: LockReceipt<ConfigActions, ()>,
package_volumes: LockReceipt<Volumes, ()>,
package_version: LockReceipt<Version, ()>,
docker_container: LockReceipt<DockerContainer, String>,
docker_containers: LockReceipt<DockerContainers, String>,
}
impl DependencyConfigReceipts {
@@ -625,11 +624,11 @@ impl DependencyConfigReceipts {
.map(|x| x.manifest().version())
.make_locker(LockType::Write)
.add_to_keys(locks);
let docker_container = crate::db::DatabaseModel::new()
let docker_containers = crate::db::DatabaseModel::new()
.package_data()
.star()
.installed()
.and_then(|x| x.manifest().container())
.and_then(|x| x.manifest().containers())
.make_locker(LockType::Write)
.add_to_keys(locks);
move |skeleton_key| {
@@ -641,7 +640,7 @@ impl DependencyConfigReceipts {
dependency_config_action: dependency_config_action.verify(&skeleton_key)?,
package_volumes: package_volumes.verify(&skeleton_key)?,
package_version: package_version.verify(&skeleton_key)?,
docker_container: docker_container.verify(&skeleton_key)?,
docker_containers: docker_containers.verify(&skeleton_key)?,
})
}
}
@@ -716,8 +715,7 @@ pub async fn configure_logic(
let dependency_version = receipts.dependency_version.get(db).await?;
let dependency_volumes = receipts.dependency_volumes.get(db).await?;
let dependencies = receipts.dependencies.get(db).await?;
let dependency_docker_container = receipts.docker_container.get(db, &*dependency_id).await?;
let pkg_docker_container = receipts.docker_container.get(db, &*pkg_id).await?;
let pkg_docker_container = receipts.docker_containers.get(db, &*pkg_id).await?;
let dependency = dependencies
.0
@@ -750,7 +748,6 @@ pub async fn configure_logic(
} = dependency_config_action
.get(
&ctx,
&dependency_docker_container,
&dependency_id,
&dependency_version,
&dependency_volumes,

View File

@@ -1,10 +1,6 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use bollard::image::ListImagesOptions;
use color_eyre::Report;
use futures::FutureExt;
use patch_db::{DbHandle, LockReceipt, LockTargetId, LockType, PatchDbHandle, Verifier};
use sqlx::{Executor, Postgres};
use tracing::instrument;
@@ -422,7 +418,7 @@ pub fn cleanup_folder(
Box::pin(async move {
let meta_data = match tokio::fs::metadata(&path).await {
Ok(a) => a,
Err(e) => {
Err(_e) => {
return;
}
};
@@ -441,7 +437,7 @@ pub fn cleanup_folder(
}
let mut read_dir = match tokio::fs::read_dir(&path).await {
Ok(a) => a,
Err(e) => {
Err(_e) => {
return;
}
};

View File

@@ -1318,7 +1318,6 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
.manifest
.migrations
.to(
&prev.manifest.container,
ctx,
version,
pkg_id,
@@ -1329,7 +1328,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
let migration = manifest
.migrations
.from(
&manifest.container,
&manifest.containers,
ctx,
&prev.manifest.version,
pkg_id,
@@ -1413,7 +1412,6 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
manifest
.backup
.restore(
&manifest.container,
ctx,
&mut tx,
&mut sql_tx,
@@ -1522,7 +1520,7 @@ async fn handle_recovered_package(
receipts: &ConfigReceipts,
) -> Result<(), Error> {
let configured = if let Some(migration) = manifest.migrations.from(
&manifest.container,
&manifest.containers,
ctx,
&recovered.version,
pkg_id,

View File

@@ -115,7 +115,7 @@ pub async fn check<Db: DbHandle>(
.health_checks
.check_all(
ctx,
&manifest.container,
&manifest.containers,
started,
id,
&manifest.version,

View File

@@ -10,15 +10,17 @@ use std::time::Duration;
use bollard::container::{KillContainerOptions, StopContainerOptions};
use chrono::Utc;
use color_eyre::eyre::eyre;
use embassy_container_init::{InputJsonRpc, RpcId};
use models::{ExecCommand, TermCommand};
use nix::sys::signal::Signal;
use num_enum::TryFromPrimitive;
use patch_db::DbHandle;
use sqlx::{Executor, Postgres};
use tokio::io::BufReader;
use tokio::sync::watch::error::RecvError;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::{Mutex, Notify, RwLock};
use tokio::task::JoinHandle;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
use tokio_stream::wrappers::UnboundedReceiverStream;
use torut::onion::TorSecretKeyV3;
use tracing::instrument;
@@ -27,7 +29,9 @@ use crate::manager::sync::synchronizer;
use crate::net::interface::InterfaceId;
use crate::net::GeneratedCertificateMountPoint;
use crate::notifications::NotificationLevel;
use crate::procedure::docker::{DockerContainer, DockerInject, DockerProcedure};
use crate::procedure::docker::{DockerContainer, DockerProcedure, LongRunning};
#[cfg(feature = "js_engine")]
use crate::procedure::js_scripts::JsProcedure;
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
use crate::s9pk::manifest::{Manifest, PackageId};
use crate::status::MainStatus;
@@ -191,14 +195,15 @@ async fn run_main(
let generated_certificate = generate_certificate(state, &interfaces).await?;
persistant.wait_for_persistant().await;
let is_injectable_main = check_is_injectable_main(&state);
let mut runtime = match is_injectable_main {
true => {
tokio::spawn(
async move { start_up_inject_image(rt_state, generated_certificate).await },
)
let is_injectable_main = check_is_injectable_main(state);
let mut runtime = match injectable_main(state) {
InjectableMain::None => {
tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await })
}
#[cfg(feature = "js_engine")]
InjectableMain::Script(_) => {
tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await })
}
false => tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await }),
};
let ip = match is_injectable_main {
false => Some(match get_running_ip(state, &mut runtime).await {
@@ -219,6 +224,7 @@ async fn run_main(
let res = tokio::select! {
a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).and_then(|a| a),
_ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)),
};
if let Some(ip) = ip {
remove_network_for_main(state, ip).await?;
@@ -237,29 +243,6 @@ async fn start_up_image(
.main
.execute::<(), NoOutput>(
&rt_state.ctx,
&rt_state.manifest.container,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::Main,
&rt_state.manifest.volumes,
None,
None,
)
.await
}
/// We want to start up the manifest, but in this case we want to know that we have generated the certificates.
/// Note for _generated_certificate: Needed to know that before we start the state we have generated the certificate
async fn start_up_inject_image(
rt_state: Arc<ManagerSharedState>,
_generated_certificate: GeneratedCertificateMountPoint,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
rt_state
.manifest
.main
.inject::<(), NoOutput>(
&rt_state.ctx,
&rt_state.manifest.container,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::Main,
@@ -295,8 +278,8 @@ impl Manager {
let managers_persistant = persistant_container.clone();
let thread = tokio::spawn(async move {
tokio::select! {
_ = manager_thread_loop(recv, &thread_shared, managers_persistant) => (),
_ = synchronizer(&*thread_shared) => (),
_ = manager_thread_loop(recv, &thread_shared, managers_persistant.clone()) => (),
_ = synchronizer(&*thread_shared, managers_persistant) => (),
}
});
Ok(Manager {
@@ -348,16 +331,21 @@ impl Manager {
.commit_health_check_results
.store(false, Ordering::SeqCst);
let _ = self.shared.on_stop.send(OnStop::Exit);
let sigterm_timeout: Option<crate::util::serde::Duration> = match &self.shared.manifest.main
let sigterm_timeout: Option<crate::util::serde::Duration> = match self
.shared
.manifest
.containers
.as_ref()
.map(|x| x.main.sigterm_timeout)
{
PackageProcedure::Docker(DockerProcedure {
sigterm_timeout, ..
})
| PackageProcedure::DockerInject(DockerInject {
sigterm_timeout, ..
}) => sigterm_timeout.clone(),
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => return Ok(()),
Some(a) => a,
None => match &self.shared.manifest.main {
PackageProcedure::Docker(DockerProcedure {
sigterm_timeout, ..
}) => *sigterm_timeout,
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => return Ok(()),
},
};
self.persistant_container.stop().await;
@@ -392,7 +380,11 @@ impl Manager {
a => a?,
};
} else {
stop_non_first(&*self.shared.container_name).await;
stop_long_running_processes(
&*self.shared.container_name,
self.persistant_container.command_inserter.clone(),
)
.await;
}
self.shared.status.store(
@@ -414,6 +406,13 @@ impl Manager {
self.shared.synchronize_now.notify_waiters();
self.shared.synchronized.notified().await
}
pub fn exec_command(&self) -> ExecCommand {
self.persistant_container.exec_command()
}
pub fn term_command(&self) -> TermCommand {
self.persistant_container.term_command()
}
}
async fn manager_thread_loop(
@@ -460,7 +459,7 @@ async fn manager_thread_loop(
);
}
}
match run_main(&thread_shared, persistant_container.clone()).await {
match run_main(thread_shared, persistant_container.clone()).await {
Ok(Ok(NoOutput)) => (), // restart
Ok(Err(e)) => {
let mut db = thread_shared.ctx.db.handle();
@@ -489,12 +488,9 @@ async fn manager_thread_loop(
Some(3600) // 1 hour
)
.await;
match res {
Err(e) => {
tracing::error!("Failed to issue notification: {}", e);
tracing::debug!("{:?}", e);
}
Ok(()) => {}
if let Err(e) = res {
tracing::error!("Failed to issue notification: {}", e);
tracing::debug!("{:?}", e);
}
}
_ => tracing::error!("service just started. not issuing crash notification"),
@@ -510,12 +506,121 @@ async fn manager_thread_loop(
}
}
struct PersistantContainer {
struct LongRunningHandle(NonDetachingJoinHandle<()>);
pub struct CommandInserter {
command_counter: AtomicUsize,
input: UnboundedSender<InputJsonRpc>,
outputs: Arc<Mutex<BTreeMap<RpcId, UnboundedSender<embassy_container_init::Output>>>>,
}
impl Drop for CommandInserter {
fn drop(&mut self) {
use embassy_container_init::{Input, JsonRpc};
let CommandInserter {
command_counter,
input,
outputs: _,
} = self;
let upper: usize = command_counter.load(Ordering::Relaxed);
for i in 0..upper {
let _ignored_result = input.send(JsonRpc::new(RpcId::UInt(i as u32), Input::Term()));
}
}
}
impl CommandInserter {
fn new(
long_running: LongRunning,
input: UnboundedSender<InputJsonRpc>,
) -> (Self, LongRunningHandle) {
let LongRunning {
mut output,
running_output,
} = long_running;
let command_counter = AtomicUsize::new(0);
let outputs: Arc<Mutex<BTreeMap<RpcId, UnboundedSender<embassy_container_init::Output>>>> =
Default::default();
let handle = LongRunningHandle(running_output);
tokio::spawn({
let outputs = outputs.clone();
async move {
while let Some(output) = output.recv().await {
let (id, output) = output.into_pair();
let mut outputs = outputs.lock().await;
let output_sender = outputs.get_mut(&id);
if let Some(output_sender) = output_sender {
if let Err(err) = output_sender.send(output) {
tracing::warn!("Could no longer send an output");
tracing::debug!("{err:?}");
outputs.remove(&id);
}
}
}
}
});
(
Self {
command_counter,
input,
outputs,
},
handle,
)
}
pub async fn exec_command(
&self,
command: String,
args: Vec<String>,
sender: UnboundedSender<embassy_container_init::Output>,
timeout: Option<Duration>,
) -> Option<RpcId> {
use embassy_container_init::{Input, JsonRpc};
let mut outputs = self.outputs.lock().await;
let command_counter = self.command_counter.fetch_add(1, Ordering::SeqCst) as u32;
let command_id = RpcId::UInt(command_counter);
outputs.insert(command_id.clone(), sender);
if let Some(timeout) = timeout {
tokio::spawn({
let input = self.input.clone();
let command_id = command_id.clone();
async move {
tokio::time::sleep(timeout).await;
let _ignored_output = input.send(JsonRpc::new(command_id, Input::Kill()));
}
});
}
if let Err(err) = self.input.send(JsonRpc::new(
command_id.clone(),
Input::Command { command, args },
)) {
tracing::warn!("Trying to send to input but can't");
tracing::debug!("{err:?}");
return None;
}
Some(command_id)
}
pub async fn term(&self, id: RpcId) {
use embassy_container_init::{Input, JsonRpc};
self.outputs.lock().await.remove(&id);
let _ignored_term = self.input.send(JsonRpc::new(id, Input::Term()));
}
pub async fn term_all(&self) {
for i in 0..self.command_counter.load(Ordering::Relaxed) {
self.term(RpcId::UInt(i as u32)).await;
}
}
}
type RunningDocker =
Arc<Mutex<Option<NonDetachingJoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>>>>;
pub struct PersistantContainer {
container_name: String,
running_docker:
Arc<Mutex<Option<NonDetachingJoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>>>>,
running_docker: RunningDocker,
should_stop_running: Arc<std::sync::atomic::AtomicBool>,
wait_for_start: (Sender<bool>, Receiver<bool>),
command_inserter: Arc<Mutex<Option<CommandInserter>>>,
}
impl PersistantContainer {
@@ -526,7 +631,8 @@ impl PersistantContainer {
container_name: thread_shared.container_name.clone(),
running_docker: Arc::new(Mutex::new(None)),
should_stop_running: Arc::new(AtomicBool::new(false)),
wait_for_start: wait_for_start,
wait_for_start,
command_inserter: Default::default(),
});
tokio::spawn(persistant_container(
thread_shared.clone(),
@@ -542,12 +648,7 @@ impl PersistantContainer {
*running_docker = None;
use tokio::process::Command;
if let Err(_err) = Command::new("docker")
.args(["stop", "-t", "0", &*container_name])
.output()
.await
{}
if let Err(_err) = Command::new("docker")
.args(["kill", &*container_name])
.args(["stop", "-t", "30", container_name])
.output()
.await
{}
@@ -569,22 +670,83 @@ impl PersistantContainer {
async fn done_waiting(&self) {
self.wait_for_start.0.send(false).unwrap();
}
fn term_command(&self) -> TermCommand {
let cloned = self.command_inserter.clone();
Arc::new(move |id| {
let cloned = cloned.clone();
Box::pin(async move {
let lock = cloned.lock().await;
let _id = match &*lock {
Some(command_inserter) => command_inserter.term(id).await,
None => {
return Err("Couldn't get a command inserter in current service".to_string())
}
};
Ok::<(), String>(())
})
})
}
fn exec_command(&self) -> ExecCommand {
let cloned = self.command_inserter.clone();
/// A handle that on drop will clean all the ids that are inserter in the fn.
struct Cleaner {
command_inserter: Arc<Mutex<Option<CommandInserter>>>,
ids: ::std::collections::BTreeSet<RpcId>,
}
impl Drop for Cleaner {
fn drop(&mut self) {
let command_inserter = self.command_inserter.clone();
let ids = ::std::mem::take(&mut self.ids);
tokio::spawn(async move {
let command_inserter_lock = command_inserter.lock().await;
let command_inserter = match &*command_inserter_lock {
Some(a) => a,
None => {
return;
}
};
for id in ids {
command_inserter.term(id).await;
}
});
}
}
let cleaner = Arc::new(Mutex::new(Cleaner {
command_inserter: cloned.clone(),
ids: Default::default(),
}));
Arc::new(move |command, args, sender, timeout| {
let cloned = cloned.clone();
let cleaner = cleaner.clone();
Box::pin(async move {
let lock = cloned.lock().await;
let id = match &*lock {
Some(command_inserter) => {
if let Some(id) = command_inserter
.exec_command(command.clone(), args.clone(), sender, timeout)
.await
{
let mut cleaner = cleaner.lock().await;
cleaner.ids.insert(id.clone());
id
} else {
return Err("Couldn't get command started ".to_string());
}
}
None => {
return Err("Couldn't get a command inserter in current service".to_string())
}
};
Ok::<RpcId, String>(id)
})
})
}
}
impl Drop for PersistantContainer {
fn drop(&mut self) {
self.should_stop_running.store(true, Ordering::SeqCst);
let container_name = self.container_name.clone();
let running_docker = self.running_docker.clone();
tokio::spawn(async move {
let mut running_docker = running_docker.lock().await;
*running_docker = None;
use std::process::Command;
if let Err(_err) = Command::new("docker")
.args(["kill", &*container_name])
.output()
{}
});
}
}
@@ -594,12 +756,15 @@ async fn persistant_container(
) {
let main_docker_procedure_for_long = injectable_main(&thread_shared);
match main_docker_procedure_for_long {
Some(main) => loop {
InjectableMain::None => futures::future::pending().await,
#[cfg(feature = "js_engine")]
InjectableMain::Script((container_inject, procedure)) => loop {
let main = DockerProcedure::main_docker_procedure_js(container_inject, procedure);
if container.should_stop_running.load(Ordering::SeqCst) {
return;
}
container.start_wait().await;
match run_persistant_container(&thread_shared, container.clone(), main.clone()).await {
match run_persistant_container(&thread_shared, container.clone(), main).await {
Ok(_) => (),
Err(e) => {
tracing::error!("failed to start persistant container: {}", e);
@@ -607,60 +772,50 @@ async fn persistant_container(
}
}
},
None => futures::future::pending().await,
}
}
fn injectable_main(thread_shared: &Arc<ManagerSharedState>) -> Option<Arc<DockerProcedure>> {
if let (
PackageProcedure::DockerInject(DockerInject {
system,
entrypoint,
args,
io_format,
sigterm_timeout,
}),
Some(DockerContainer {
image,
mounts,
shm_size_mb,
}),
) = (
#[cfg(not(feature = "js_engine"))]
enum InjectableMain {
None,
}
#[cfg(feature = "js_engine")]
enum InjectableMain<'a> {
None,
Script((&'a DockerContainer, &'a JsProcedure)),
}
fn injectable_main(thread_shared: &Arc<ManagerSharedState>) -> InjectableMain {
match (
&thread_shared.manifest.main,
&thread_shared.manifest.container,
&thread_shared.manifest.containers.as_ref().map(|x| &x.main),
) {
Some(Arc::new(DockerProcedure {
image: image.clone(),
mounts: mounts.clone(),
io_format: *io_format,
shm_size_mb: *shm_size_mb,
sigterm_timeout: *sigterm_timeout,
system: *system,
entrypoint: "sleep".to_string(),
args: vec!["infinity".to_string()],
}))
} else {
None
#[cfg(feature = "js_engine")]
(PackageProcedure::Script(inject), Some(container)) => {
InjectableMain::Script((container, inject))
}
_ => InjectableMain::None,
}
}
fn check_is_injectable_main(thread_shared: &ManagerSharedState) -> bool {
match &thread_shared.manifest.main {
PackageProcedure::Docker(_a) => false,
PackageProcedure::DockerInject(a) => true,
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => false,
PackageProcedure::Script(_) => true,
}
}
async fn run_persistant_container(
state: &Arc<ManagerSharedState>,
persistant: Arc<PersistantContainer>,
docker_procedure: Arc<DockerProcedure>,
docker_procedure: DockerProcedure,
) -> Result<(), Error> {
let interfaces = states_main_interfaces(state)?;
let generated_certificate = generate_certificate(state, &interfaces).await?;
let mut runtime = tokio::spawn(long_running_docker(state.clone(), docker_procedure));
let mut runtime =
long_running_docker(state.clone(), docker_procedure, persistant.clone()).await?;
let ip = match get_running_ip(state, &mut runtime).await {
let ip = match get_long_running_ip(state, &mut runtime).await {
GetRunninIp::Ip(x) => x,
GetRunninIp::Error(e) => return Err(e),
GetRunninIp::EarlyExit(e) => {
@@ -674,7 +829,7 @@ async fn run_persistant_container(
fetch_starting_to_running(state);
let res = tokio::select! {
a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
a = runtime.0 => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()),
};
remove_network_for_main(state, ip).await?;
res
@@ -682,19 +837,23 @@ async fn run_persistant_container(
async fn long_running_docker(
rt_state: Arc<ManagerSharedState>,
main_status: Arc<DockerProcedure>,
) -> Result<Result<NoOutput, (i32, String)>, Error> {
main_status
.execute::<(), NoOutput>(
main_status: DockerProcedure,
container: Arc<PersistantContainer>,
) -> Result<LongRunningHandle, Error> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let long_running = main_status
.long_running_execute(
&rt_state.ctx,
&rt_state.manifest.id,
&rt_state.manifest.version,
ProcedureName::LongRunning,
&rt_state.manifest.volumes,
None,
None,
UnboundedReceiverStream::new(receiver),
)
.await
.await?;
let (command_inserter, long_running_handle) = CommandInserter::new(long_running, sender);
*container.command_inserter.lock().await = Some(command_inserter);
Ok(long_running_handle)
}
async fn remove_network_for_main(
@@ -778,9 +937,11 @@ enum GetRunninIp {
EarlyExit(Result<NoOutput, (i32, String)>),
}
type RuntimeOfCommand = JoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>;
async fn get_running_ip(
state: &Arc<ManagerSharedState>,
mut runtime: &mut tokio::task::JoinHandle<Result<Result<NoOutput, (i32, String)>, Error>>,
mut runtime: &mut RuntimeOfCommand,
) -> GetRunninIp {
loop {
match container_inspect(state).await {
@@ -805,8 +966,8 @@ async fn get_running_ip(
}) => (),
Err(e) => return GetRunninIp::Error(e.into()),
}
match futures::poll!(&mut runtime) {
Poll::Ready(res) => match res {
if let Poll::Ready(res) = futures::poll!(&mut runtime) {
match res {
Ok(Ok(response)) => return GetRunninIp::EarlyExit(response),
Err(_) | Ok(Err(_)) => {
return GetRunninIp::Error(Error::new(
@@ -814,8 +975,48 @@ async fn get_running_ip(
crate::ErrorKind::Docker,
))
}
},
_ => (),
}
}
}
}
async fn get_long_running_ip(
state: &Arc<ManagerSharedState>,
runtime: &mut LongRunningHandle,
) -> GetRunninIp {
loop {
match container_inspect(state).await {
Ok(res) => {
match res
.network_settings
.and_then(|ns| ns.networks)
.and_then(|mut n| n.remove("start9"))
.and_then(|es| es.ip_address)
.filter(|ip| !ip.is_empty())
.map(|ip| ip.parse())
.transpose()
{
Ok(Some(ip_addr)) => return GetRunninIp::Ip(ip_addr),
Ok(None) => (),
Err(e) => return GetRunninIp::Error(e.into()),
}
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
}) => (),
Err(e) => return GetRunninIp::Error(e.into()),
}
if let Poll::Ready(res) = futures::poll!(&mut runtime.0) {
match res {
Ok(_) => return GetRunninIp::EarlyExit(Ok(NoOutput)),
Err(_e) => {
return GetRunninIp::Error(Error::new(
eyre!("Manager runtime panicked!"),
crate::ErrorKind::Docker,
))
}
}
}
}
}
@@ -838,11 +1039,11 @@ async fn generate_certificate(
TorSecretKeyV3,
)>,
) -> Result<GeneratedCertificateMountPoint, Error> {
Ok(state
state
.ctx
.net_controller
.generate_certificate_mountpoint(&state.manifest.id, interfaces)
.await?)
.await
}
fn states_main_interfaces(
@@ -855,7 +1056,7 @@ fn states_main_interfaces(
)>,
Error,
> {
Ok(state
state
.manifest
.interfaces
.0
@@ -873,11 +1074,14 @@ fn states_main_interfaces(
.clone(),
))
})
.collect::<Result<Vec<_>, Error>>()?)
.collect::<Result<Vec<_>, Error>>()
}
#[instrument(skip(shared))]
async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
#[instrument(skip(shared, persistant_container))]
async fn stop(
shared: &ManagerSharedState,
persistant_container: Arc<PersistantContainer>,
) -> Result<(), Error> {
shared
.commit_health_check_results
.store(false, Ordering::SeqCst);
@@ -896,9 +1100,6 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
match &shared.manifest.main {
PackageProcedure::Docker(DockerProcedure {
sigterm_timeout, ..
})
| PackageProcedure::DockerInject(DockerInject {
sigterm_timeout, ..
}) => {
if !check_is_injectable_main(shared) {
match shared
@@ -930,11 +1131,23 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
a => a?,
};
} else {
stop_non_first(&shared.container_name).await;
stop_long_running_processes(
&shared.container_name,
persistant_container.command_inserter.clone(),
)
.await;
}
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => return Ok(()),
PackageProcedure::Script(_) => {
if check_is_injectable_main(shared) {
stop_long_running_processes(
&shared.container_name,
persistant_container.command_inserter.clone(),
)
.await;
}
}
};
tracing::debug!("Stopping a docker");
shared.status.store(
@@ -945,11 +1158,13 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> {
}
/// So the sleep infinity, which is the long running, is pid 1. So we kill the others
async fn stop_non_first(container_name: &str) {
// tracing::error!("BLUJ TODO: sudo docker exec {} sh -c \"ps ax | awk '\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {{print \\$1}}' | xargs kill\"", container_name);
// (sleep infinity) & export RUNNING=$! && echo $! && (wait $RUNNING && echo "DONE FOR $RUNNING") &
// (RUNNING=$(sleep infinity & echo $!); echo "running $RUNNING"; wait $RUNNING; echo "DONE FOR ?") &
async fn stop_long_running_processes(
container_name: &str,
command_inserter: Arc<Mutex<Option<CommandInserter>>>,
) {
if let Some(command_inserter) = &*command_inserter.lock().await {
command_inserter.term_all().await;
}
let _ = tokio::process::Command::new("docker")
.args([
@@ -964,24 +1179,6 @@ async fn stop_non_first(container_name: &str) {
.await;
}
// #[test]
// fn test_stop_non_first() {
// assert_eq!(
// &format!(
// "{}",
// tokio::process::Command::new("docker").args([
// "container",
// "exec",
// "container_name",
// "sh",
// "-c",
// "ps ax | awk \"\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {print \\$1}\"| xargs kill",
// ])
// ),
// ""
// );
// }
#[instrument(skip(shared))]
async fn start(shared: &ManagerSharedState) -> Result<(), Error> {
shared.on_stop.send(OnStop::Restart).map_err(|_| {
@@ -1002,8 +1199,11 @@ async fn start(shared: &ManagerSharedState) -> Result<(), Error> {
Ok(())
}
#[instrument(skip(shared))]
async fn pause(shared: &ManagerSharedState) -> Result<(), Error> {
#[instrument(skip(shared, persistant_container))]
async fn pause(
shared: &ManagerSharedState,
persistant_container: Arc<PersistantContainer>,
) -> Result<(), Error> {
if let Err(e) = shared
.ctx
.docker
@@ -1012,7 +1212,7 @@ async fn pause(shared: &ManagerSharedState) -> Result<(), Error> {
{
tracing::error!("failed to pause container. stopping instead. {}", e);
tracing::debug!("{:?}", e);
return stop(shared).await;
return stop(shared, persistant_container).await;
}
shared
.status

View File

@@ -1,16 +1,19 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{collections::BTreeMap, sync::Arc};
use chrono::Utc;
use super::{pause, resume, start, stop, ManagerSharedState, Status};
use super::{pause, resume, start, stop, ManagerSharedState, PersistantContainer, Status};
use crate::status::MainStatus;
use crate::Error;
/// Allocates a db handle. DO NOT CALL with a db handle already in scope
async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error> {
async fn synchronize_once(
shared: &ManagerSharedState,
persistant_container: Arc<PersistantContainer>,
) -> Result<Status, Error> {
let mut db = shared.ctx.db.handle();
let mut status = crate::db::DatabaseModel::new()
.package_data()
@@ -45,16 +48,16 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error>
},
Status::Starting => match *status {
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
stop(shared).await?;
stop(shared, persistant_container).await?;
}
MainStatus::Starting { .. } | MainStatus::Running { .. } => (),
MainStatus::BackingUp { .. } => {
pause(shared).await?;
pause(shared, persistant_container).await?;
}
},
Status::Running => match *status {
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
stop(shared).await?;
stop(shared, persistant_container).await?;
}
MainStatus::Starting { .. } => {
*status = MainStatus::Running {
@@ -64,12 +67,12 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error>
}
MainStatus::Running { .. } => (),
MainStatus::BackingUp { .. } => {
pause(shared).await?;
pause(shared, persistant_container).await?;
}
},
Status::Paused => match *status {
MainStatus::Stopped | MainStatus::Stopping | MainStatus::Restarting => {
stop(shared).await?;
stop(shared, persistant_container).await?;
}
MainStatus::Starting { .. } | MainStatus::Running { .. } => {
resume(shared).await?;
@@ -82,13 +85,16 @@ async fn synchronize_once(shared: &ManagerSharedState) -> Result<Status, Error>
Ok(manager_status)
}
pub async fn synchronizer(shared: &ManagerSharedState) {
pub async fn synchronizer(
shared: &ManagerSharedState,
persistant_container: Arc<PersistantContainer>,
) {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => (),
_ = shared.synchronize_now.notified() => (),
}
let status = match synchronize_once(shared).await {
let status = match synchronize_once(shared, persistant_container.clone()).await {
Err(e) => {
tracing::error!(
"Synchronizer for {}@{} failed: {}",

View File

@@ -1,11 +1,7 @@
use std::sync::Arc;
use aes::cipher::{CipherKey, NewCipher, Nonce, StreamCipher};
use aes::Aes256Ctr;
use futures::Stream;
use hmac::Hmac;
use josekit::jwk::Jwk;
use rpc_toolkit::hyper::{self, Body};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use tracing::instrument;
@@ -113,6 +109,6 @@ fn test_gen_awk() {
}"#).unwrap();
assert_eq!(
"testing12345",
&encrypted.decrypt(Arc::new(private_key)).unwrap()
&encrypted.decrypt(std::sync::Arc::new(private_key)).unwrap()
);
}

View File

@@ -10,7 +10,7 @@ use tracing::instrument;
use crate::context::RpcContext;
use crate::id::ImageId;
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{PackageProcedure, ProcedureName};
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
@@ -27,7 +27,7 @@ impl Migrations {
#[instrument]
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -58,7 +58,7 @@ impl Migrations {
#[instrument(skip(ctx))]
pub fn from<'a>(
&'a self,
container: &'a Option<DockerContainer>,
container: &'a Option<DockerContainers>,
ctx: &'a RpcContext,
version: &'a Version,
pkg_id: &'a PackageId,
@@ -70,11 +70,10 @@ impl Migrations {
.iter()
.find(|(range, _)| version.satisfies(*range))
{
Some(
Some(async move {
migration
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::Migration, // Migrations cannot be executed concurrently
@@ -88,8 +87,9 @@ impl Migrations {
Error::new(eyre!("{}", e.1), crate::ErrorKind::MigrationFailed)
})
})
}),
)
})
.await
})
} else {
None
}
@@ -98,7 +98,6 @@ impl Migrations {
#[instrument(skip(ctx))]
pub fn to<'a>(
&'a self,
container: &'a Option<DockerContainer>,
ctx: &'a RpcContext,
version: &'a Version,
pkg_id: &'a PackageId,
@@ -106,11 +105,10 @@ impl Migrations {
volumes: &'a Volumes,
) -> Option<impl Future<Output = Result<MigrationRes, Error>> + 'a> {
if let Some((_, migration)) = self.to.iter().find(|(range, _)| version.satisfies(*range)) {
Some(
Some(async move {
migration
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::Migration,
@@ -124,8 +122,9 @@ impl Migrations {
Error::new(eyre!("{}", e.1), crate::ErrorKind::MigrationFailed)
})
})
}),
)
})
.await
})
} else {
None
}

View File

@@ -9,14 +9,19 @@ use async_stream::stream;
use bollard::container::RemoveContainerOptions;
use color_eyre::eyre::eyre;
use color_eyre::Report;
use embassy_container_init::{InputJsonRpc, OutputJsonRpc};
use futures::future::Either as EitherFuture;
use futures::TryStreamExt;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use helpers::NonDetachingJoinHandle;
use nix::sys::signal;
use nix::unistd::Pid;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, BufReader},
process::Child,
sync::mpsc::UnboundedReceiver,
};
use tracing::instrument;
use super::ProcedureName;
@@ -41,6 +46,17 @@ lazy_static::lazy_static! {
};
}
#[derive(Clone, Debug, Deserialize, Serialize, patch_db::HasModel)]
#[serde(rename_all = "kebab-case")]
pub struct DockerContainers {
pub main: DockerContainer,
// #[serde(default)]
// pub aux: BTreeMap<String, DockerContainer>,
}
/// This is like the docker procedures of the past designs,
/// but this time all the entrypoints and args are not
/// part of this struct by choice. Used for the times that we are creating our own entry points
#[derive(Clone, Debug, Deserialize, Serialize, patch_db::HasModel)]
#[serde(rename_all = "kebab-case")]
pub struct DockerContainer {
@@ -49,6 +65,10 @@ pub struct DockerContainer {
pub mounts: BTreeMap<VolumeId, PathBuf>,
#[serde(default)]
pub shm_size_mb: Option<usize>, // TODO: use postfix sizing? like 1k vs 1m vs 1g
#[serde(default)]
pub sigterm_timeout: Option<SerdeDuration>,
#[serde(default)]
pub system: bool,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -70,7 +90,7 @@ pub struct DockerProcedure {
pub shm_size_mb: Option<usize>, // TODO: use postfix sizing? like 1k vs 1m vs 1g
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub struct DockerInject {
#[serde(default)]
@@ -83,26 +103,42 @@ pub struct DockerInject {
#[serde(default)]
pub sigterm_timeout: Option<SerdeDuration>,
}
impl From<(&DockerContainer, &DockerInject)> for DockerProcedure {
fn from((container, injectable): (&DockerContainer, &DockerInject)) -> Self {
impl DockerProcedure {
pub fn main_docker_procedure(
container: &DockerContainer,
injectable: &DockerInject,
) -> DockerProcedure {
DockerProcedure {
image: container.image.clone(),
system: injectable.system.clone(),
system: injectable.system,
entrypoint: injectable.entrypoint.clone(),
args: injectable.args.clone(),
mounts: container.mounts.clone(),
io_format: injectable.io_format.clone(),
sigterm_timeout: injectable.sigterm_timeout.clone(),
shm_size_mb: container.shm_size_mb.clone(),
io_format: injectable.io_format,
sigterm_timeout: injectable.sigterm_timeout,
shm_size_mb: container.shm_size_mb,
}
}
#[cfg(feature = "js_engine")]
pub fn main_docker_procedure_js(
container: &DockerContainer,
_procedure: &super::js_scripts::JsProcedure,
) -> DockerProcedure {
DockerProcedure {
image: container.image.clone(),
system: container.system,
entrypoint: "sleep".to_string(),
args: Vec::new(),
mounts: container.mounts.clone(),
io_format: None,
sigterm_timeout: container.sigterm_timeout,
shm_size_mb: container.shm_size_mb,
}
}
}
impl DockerProcedure {
pub fn validate(
&self,
eos_version: &Version,
_eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
expected_io: bool,
@@ -116,10 +152,8 @@ impl DockerProcedure {
if !SYSTEM_IMAGES.contains(&self.image) {
color_eyre::eyre::bail!("unknown system image: {}", self.image);
}
} else {
if !image_ids.contains(&self.image) {
color_eyre::eyre::bail!("image for {} not contained in package", self.image);
}
} else if !image_ids.contains(&self.image) {
color_eyre::eyre::bail!("image for {} not contained in package", self.image);
}
if expected_io && self.io_format.is_none() {
color_eyre::eyre::bail!("expected io-format");
@@ -128,7 +162,7 @@ impl DockerProcedure {
}
#[instrument(skip(ctx, input))]
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
pub async fn execute<I: Serialize, O: DeserializeOwned>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
@@ -217,7 +251,7 @@ impl DockerProcedure {
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stout"))
.ok_or_else(|| eyre!("Can't takeout stdout in execute"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
@@ -307,25 +341,83 @@ impl DockerProcedure {
)
}
/// We created a new exec runner, where we are going to be passing the commands for it to run.
/// Idea is that we are going to send it command and get the inputs be filtered back from the manager.
/// Then we could in theory run commands without the cost of running the docker exec which is known to have
/// a dely of > 200ms which is not acceptable.
#[instrument(skip(ctx, input))]
pub async fn inject<I: Serialize, O: for<'de> Deserialize<'de>>(
pub async fn long_running_execute<S>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
volumes: &Volumes,
input: S,
) -> Result<LongRunning, Error>
where
S: Stream<Item = InputJsonRpc> + Send + 'static,
{
let name = name.docker_name();
let name: Option<&str> = name.as_deref();
let container_name = Self::container_name(pkg_id, name);
let mut cmd = LongRunning::setup_long_running_docker_cmd(
self,
ctx,
&container_name,
volumes,
pkg_id,
pkg_version,
)
.await?;
let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?;
let input_handle = LongRunning::spawn_input_handle(&mut handle, input)?
.map_err(|e| eyre!("Input Handle Error: {e:?}"));
let (output, output_handle) = LongRunning::spawn_output_handle(&mut handle)?;
let output_handle = output_handle.map_err(|e| eyre!("Output Handle Error: {e:?}"));
let err_handle = LongRunning::spawn_error_handle(&mut handle)?
.map_err(|e| eyre!("Err Handle Error: {e:?}"));
let running_output = NonDetachingJoinHandle::from(tokio::spawn(async move {
if let Err(err) = tokio::select!(
x = handle.wait().map_err(|e| eyre!("Runtime error: {e:?}")) => x.map(|_| ()),
x = err_handle => x.map(|_| ()),
x = output_handle => x.map(|_| ()),
x = input_handle => x.map(|_| ())
) {
tracing::debug!("{:?}", err);
tracing::error!("Join error");
}
}));
Ok(LongRunning {
output,
running_output,
})
}
#[instrument(skip(_ctx, input))]
pub async fn inject<I: Serialize, O: DeserializeOwned>(
&self,
_ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
let name = name.docker_name();
let name: Option<&str> = name.as_ref().map(|x| &**x);
let name: Option<&str> = name.as_deref();
let mut cmd = tokio::process::Command::new("docker");
tracing::debug!("{:?} is exec", name);
cmd.arg("exec");
cmd.args(self.docker_args_inject(ctx, pkg_id, pkg_version).await?);
cmd.args(self.docker_args_inject(pkg_id).await?);
let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) {
cmd.stdin(std::process::Stdio::piped());
Some(format.to_vec(input)?)
@@ -372,7 +464,7 @@ impl DockerProcedure {
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stout"))
.ok_or_else(|| eyre!("Can't takeout stdout in inject"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
@@ -463,7 +555,7 @@ impl DockerProcedure {
}
#[instrument(skip(ctx, input))]
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
pub async fn sandboxed<I: Serialize, O: DeserializeOwned>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
@@ -513,7 +605,7 @@ impl DockerProcedure {
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stout"))
.ok_or_else(|| eyre!("Can't takeout stdout in sandboxed"))
.with_kind(crate::ErrorKind::Docker)?,
);
let output = NonDetachingJoinHandle::from(tokio::spawn(async move {
@@ -607,7 +699,7 @@ impl DockerProcedure {
continue;
};
let src = volume.path_for(&ctx.datadir, pkg_id, pkg_version, volume_id);
if let Err(e) = tokio::fs::metadata(&src).await {
if let Err(_e) = tokio::fs::metadata(&src).await {
tokio::fs::create_dir_all(&src).await?;
}
res.push(OsStr::new("--mount").into());
@@ -626,7 +718,6 @@ impl DockerProcedure {
res.push(OsString::from(format!("{}m", shm_size_mb)).into());
}
res.push(OsStr::new("--interactive").into());
res.push(OsStr::new("--log-driver=journald").into());
res.push(OsStr::new("--entrypoint").into());
res.push(OsStr::new(&self.entrypoint).into());
@@ -649,12 +740,7 @@ impl DockerProcedure {
+ self.args.len(), // [ARG...]
)
}
async fn docker_args_inject(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
) -> Result<Vec<Cow<'_, OsStr>>, Error> {
async fn docker_args_inject(&self, pkg_id: &PackageId) -> Result<Vec<Cow<'_, OsStr>>, Error> {
let mut res = self.new_docker_args();
if let Some(shm_size_mb) = self.shm_size_mb {
res.push(OsStr::new("--shm-size").into());
@@ -693,6 +779,215 @@ impl<T> RingVec<T> {
}
}
/// This is created when we wanted a long running docker executor that we could send commands to and get the responses back.
/// We wanted a long running since we want to be able to have the equivelent to the docker execute without the heavy costs of 400 + ms time lag.
/// Also the long running let's us have the ability to start/ end the services quicker.
pub struct LongRunning {
pub output: UnboundedReceiver<OutputJsonRpc>,
pub running_output: NonDetachingJoinHandle<()>,
}
impl LongRunning {
async fn setup_long_running_docker_cmd(
docker: &DockerProcedure,
ctx: &RpcContext,
container_name: &str,
volumes: &Volumes,
pkg_id: &PackageId,
pkg_version: &Version,
) -> Result<tokio::process::Command, Error> {
tracing::error!("BLUJ setup_long_running_docker_cmd {container_name}");
const INIT_EXEC: &str = "/start9/embassy_container_init";
const BIND_LOCATION: &str = "/usr/lib/embassy/container";
tracing::trace!("setup_long_running_docker_cmd");
LongRunning::cleanup_previous_container(ctx, container_name).await?;
let image_architecture = {
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("image")
.arg("inspect")
.arg("--format")
.arg("'{{.Architecture}}'");
if docker.system {
cmd.arg(docker.image.for_package(SYSTEM_PACKAGE_ID, None));
} else {
cmd.arg(docker.image.for_package(pkg_id, Some(pkg_version)));
}
let arch = String::from_utf8(cmd.output().await?.stdout)?;
arch.replace('\'', "").trim().to_string()
};
let mut cmd = tokio::process::Command::new("docker");
cmd.arg("run")
.arg("--network=start9")
.arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP)))
.arg("--mount")
.arg(format!("type=bind,src={BIND_LOCATION},dst=/start9"))
.arg("--name")
.arg(&container_name)
.arg(format!("--hostname={}", &container_name))
.arg("--entrypoint")
.arg(format!("{INIT_EXEC}.{image_architecture}"))
.arg("-i")
.arg("--rm");
for (volume_id, dst) in &docker.mounts {
let volume = if let Some(v) = volumes.get(volume_id) {
v
} else {
continue;
};
let src = volume.path_for(&ctx.datadir, pkg_id, pkg_version, volume_id);
if let Err(_e) = tokio::fs::metadata(&src).await {
tokio::fs::create_dir_all(&src).await?;
}
cmd.arg("--mount").arg(format!(
"type=bind,src={},dst={}{}",
src.display(),
dst.display(),
if volume.readonly() { ",readonly" } else { "" }
));
}
if let Some(shm_size_mb) = docker.shm_size_mb {
cmd.arg("--shm-size").arg(format!("{}m", shm_size_mb));
}
cmd.arg("--log-driver=journald");
if docker.system {
cmd.arg(docker.image.for_package(SYSTEM_PACKAGE_ID, None));
} else {
cmd.arg(docker.image.for_package(pkg_id, Some(pkg_version)));
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.stdin(std::process::Stdio::piped());
Ok(cmd)
}
async fn cleanup_previous_container(
ctx: &RpcContext,
container_name: &str,
) -> Result<(), Error> {
match ctx
.docker
.remove_container(
container_name,
Some(RemoveContainerOptions {
v: false,
force: true,
link: false,
}),
)
.await
{
Ok(())
| Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, // NOT FOUND
..
}) => Ok(()),
Err(e) => Err(e)?,
}
}
fn spawn_input_handle<S>(
handle: &mut Child,
input: S,
) -> Result<NonDetachingJoinHandle<()>, Error>
where
S: Stream<Item = InputJsonRpc> + Send + 'static,
{
use tokio::io::AsyncWriteExt;
let mut stdin = handle
.stdin
.take()
.ok_or_else(|| eyre!("Can't takeout stdin"))
.with_kind(crate::ErrorKind::Docker)?;
let handle = NonDetachingJoinHandle::from(tokio::spawn(async move {
let input = input;
tokio::pin!(input);
while let Some(input) = input.next().await {
let input = match serde_json::to_string(&input) {
Ok(a) => a,
Err(e) => {
tracing::debug!("{:?}", e);
tracing::error!("Docker Input Serialization issue");
continue;
}
};
if let Err(e) = stdin.write_all(format!("{input}\n").as_bytes()).await {
tracing::debug!("{:?}", e);
tracing::error!("Docker Input issue");
return;
}
}
}));
Ok(handle)
}
fn spawn_error_handle(handle: &mut Child) -> Result<NonDetachingJoinHandle<()>, Error> {
let id = handle.id();
let mut output = tokio::io::BufReader::new(
handle
.stderr
.take()
.ok_or_else(|| eyre!("Can't takeout stderr"))
.with_kind(crate::ErrorKind::Docker)?,
)
.lines();
Ok(NonDetachingJoinHandle::from(tokio::spawn(async move {
while let Ok(Some(line)) = output.next_line().await {
tracing::debug!("{:?}", id);
tracing::error!("Error from long running container");
tracing::error!("{}", line);
}
})))
}
fn spawn_output_handle(
handle: &mut Child,
) -> Result<(UnboundedReceiver<OutputJsonRpc>, NonDetachingJoinHandle<()>), Error> {
let mut output = tokio::io::BufReader::new(
handle
.stdout
.take()
.ok_or_else(|| eyre!("Can't takeout stdout for long running"))
.with_kind(crate::ErrorKind::Docker)?,
)
.lines();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<OutputJsonRpc>();
Ok((
receiver,
NonDetachingJoinHandle::from(tokio::spawn(async move {
loop {
let next = output.next_line().await;
let next = match next {
Ok(Some(a)) => a,
Ok(None) => {
tracing::error!("The docker pipe is closed?");
break;
}
Err(e) => {
tracing::debug!("{:?}", e);
tracing::error!("Output from docker, killing");
break;
}
};
let next = match serde_json::from_str(&next) {
Ok(a) => a,
Err(_e) => {
tracing::trace!("Could not decode output from long running binary");
continue;
}
};
if let Err(e) = sender.send(next) {
tracing::debug!("{:?}", e);
tracing::error!("Could no longer send output");
break;
}
}
})),
))
}
}
async fn buf_reader_to_lines(
reader: impl AsyncBufRead + Unpin,
limit: impl Into<Option<usize>>,
@@ -756,6 +1051,7 @@ async fn max_by_lines(
}
MaxByLines::Done(answer)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,10 +1,12 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
pub use js_engine::JsError;
use js_engine::{JsExecutionEnvironment, PathForVolumeId};
use models::VolumeId;
use serde::{Deserialize, Serialize};
use models::{ExecCommand, TermCommand};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::instrument;
use super::ProcedureName;
@@ -52,8 +54,8 @@ impl JsProcedure {
Ok(())
}
#[instrument(skip(directory, input))]
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
#[instrument(skip(directory, input, exec_command, term_command))]
pub async fn execute<I: Serialize, O: DeserializeOwned>(
&self,
directory: &PathBuf,
pkg_id: &PackageId,
@@ -62,6 +64,8 @@ impl JsProcedure {
volumes: &Volumes,
input: Option<I>,
timeout: Option<Duration>,
exec_command: ExecCommand,
term_command: TermCommand,
) -> Result<Result<O, (i32, String)>, Error> {
Ok(async move {
let running_action = JsExecutionEnvironment::load_from_package(
@@ -69,6 +73,8 @@ impl JsProcedure {
pkg_id,
pkg_version,
Box::new(volumes.clone()),
exec_command,
term_command,
)
.await?
.run_action(name, input, self.args.clone());
@@ -86,7 +92,7 @@ impl JsProcedure {
}
#[instrument(skip(ctx, input))]
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
pub async fn sandboxed<I: Serialize, O: DeserializeOwned>(
&self,
ctx: &RpcContext,
pkg_id: &PackageId,
@@ -102,6 +108,12 @@ impl JsProcedure {
pkg_id,
pkg_version,
Box::new(volumes.clone()),
Arc::new(|_, _, _, _| {
Box::pin(async { Err("Can't run commands in sandox mode".to_string()) })
}),
Arc::new(|_| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
)
.await?
.read_only_effects()
@@ -120,7 +132,7 @@ impl JsProcedure {
}
}
fn unwrap_known_error<O: for<'de> Deserialize<'de>>(
fn unwrap_known_error<O: DeserializeOwned>(
error_value: ErrorValue,
) -> Result<O, (JsError, String)> {
match error_value {
@@ -181,6 +193,10 @@ async fn js_action_execute() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()
@@ -236,6 +252,10 @@ async fn js_action_execute_error() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap();
@@ -280,11 +300,70 @@ async fn js_action_fetch() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn js_test_slow() {
let js_action = JsProcedure { args: vec![] };
let path: PathBuf = "test/js_action_execute/"
.parse::<PathBuf>()
.unwrap()
.canonicalize()
.unwrap();
let package_id = "test-package".parse().unwrap();
let package_version: Version = "0.3.0.3".parse().unwrap();
let name = ProcedureName::Action("slow".parse().unwrap());
let volumes: Volumes = serde_json::from_value(serde_json::json!({
"main": {
"type": "data"
},
"compat": {
"type": "assets"
},
"filebrowser" :{
"package-id": "filebrowser",
"path": "data",
"readonly": true,
"type": "pointer",
"volume-id": "main",
}
}))
.unwrap();
let input: Option<serde_json::Value> = None;
let timeout = Some(Duration::from_secs(10));
tracing::debug!("testing start");
tokio::select! {
a = js_action
.execute::<serde_json::Value, serde_json::Value>(
&path,
&package_id,
&package_version,
name,
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
=> {a
.unwrap()
.unwrap();},
_ = tokio::time::sleep(Duration::from_secs(1)) => ()
}
tracing::debug!("testing end should");
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::debug!("Done");
}
#[tokio::test]
async fn js_action_var_arg() {
let js_action = JsProcedure {
@@ -325,6 +404,10 @@ async fn js_action_var_arg() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()
@@ -369,6 +452,10 @@ async fn js_action_test_rename() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()
@@ -413,6 +500,10 @@ async fn js_action_test_deep_dir() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()
@@ -456,6 +547,10 @@ async fn js_action_test_deep_dir_escape() {
&volumes,
input,
timeout,
Arc::new(|_, _, _, _| {
Box::pin(async move { Err("Can't run commands in test".to_string()) })
}),
Arc::new(|_| Box::pin(async move { Err("Can't run commands in test".to_string()) })),
)
.await
.unwrap()

View File

@@ -1,18 +1,18 @@
use std::collections::BTreeSet;
use std::time::Duration;
use color_eyre::eyre::{bail, eyre};
use color_eyre::eyre::eyre;
use patch_db::HasModel;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::instrument;
use self::docker::{DockerContainer, DockerInject, DockerProcedure};
use self::docker::{DockerContainers, DockerProcedure};
use crate::context::RpcContext;
use crate::id::ImageId;
use crate::s9pk::manifest::PackageId;
use crate::util::Version;
use crate::volume::Volumes;
use crate::Error;
use crate::{Error, ErrorKind};
pub mod docker;
#[cfg(feature = "js_engine")]
@@ -26,7 +26,6 @@ pub use models::ProcedureName;
#[serde(tag = "type")]
pub enum PackageProcedure {
Docker(DockerProcedure),
DockerInject(DockerInject),
#[cfg(feature = "js_engine")]
Script(js_scripts::JsProcedure),
@@ -43,7 +42,7 @@ impl PackageProcedure {
#[instrument]
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -53,25 +52,15 @@ impl PackageProcedure {
PackageProcedure::Docker(action) => {
action.validate(eos_version, volumes, image_ids, expected_io)
}
PackageProcedure::DockerInject(injectable) => {
let container = match container {
None => bail!("For the docker injectable procedure, a container must be exist on the config"),
Some(container) => container,
} ;
let docker_procedure: DockerProcedure = (container, injectable).into();
docker_procedure.validate(eos_version, volumes, image_ids, expected_io)
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(action) => action.validate(volumes),
}
}
#[instrument(skip(ctx, input, container))]
pub async fn execute<I: Serialize, O: for<'de> Deserialize<'de>>(
#[instrument(skip(ctx, input))]
pub async fn execute<I: Serialize, O: DeserializeOwned + 'static>(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
@@ -86,18 +75,36 @@ impl PackageProcedure {
.execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
PackageProcedure::DockerInject(injectable) => {
let container = match container {
None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)),
Some(container) => container,
} ;
let docker_procedure: DockerProcedure = (container, injectable).into();
docker_procedure
.inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(procedure) => {
let exec_command = match ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
{
None => {
return Err(Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
))
}
Some(x) => x,
}
.exec_command();
let term_command = match ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
{
None => {
return Err(Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
))
}
Some(x) => x,
}
.term_command();
procedure
.execute(
&ctx.datadir,
@@ -107,17 +114,18 @@ impl PackageProcedure {
volumes,
input,
timeout,
exec_command,
term_command,
)
.await
}
}
}
#[instrument(skip(ctx, input, container))]
pub async fn inject<I: Serialize, O: for<'de> Deserialize<'de>>(
#[instrument(skip(ctx, input))]
pub async fn inject<I: Serialize, O: DeserializeOwned + 'static>(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
pkg_id: &PackageId,
pkg_version: &Version,
name: ProcedureName,
@@ -125,25 +133,42 @@ impl PackageProcedure {
input: Option<I>,
timeout: Option<Duration>,
) -> Result<Result<O, (i32, String)>, Error> {
tracing::trace!("Procedure inject {} {} - {:?}", self, pkg_id, name);
match self {
PackageProcedure::Docker(procedure) => {
procedure
.inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
PackageProcedure::DockerInject(injectable) => {
let container = match container {
None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)),
Some(container) => container,
} ;
let docker_procedure: DockerProcedure = (container, injectable).into();
docker_procedure
.inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout)
.await
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(procedure) => {
let exec_command = match ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
{
None => {
return Err(Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
))
}
Some(x) => x,
}
.exec_command();
let term_command = match ctx
.managers
.get(&(pkg_id.clone(), pkg_version.clone()))
.await
{
None => {
return Err(Error::new(
eyre!("No manager found for {}", pkg_id),
ErrorKind::NotFound,
))
}
Some(x) => x,
}
.term_command();
procedure
.execute(
&ctx.datadir,
@@ -153,15 +178,17 @@ impl PackageProcedure {
volumes,
input,
timeout,
exec_command,
term_command,
)
.await
}
}
}
#[instrument(skip(ctx, input))]
pub async fn sandboxed<I: Serialize, O: for<'de> Deserialize<'de>>(
pub async fn sandboxed<I: Serialize, O: DeserializeOwned>(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
ctx: &RpcContext,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -177,16 +204,6 @@ impl PackageProcedure {
.sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout)
.await
}
PackageProcedure::DockerInject(injectable) => {
let container = match container {
None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)),
Some(container) => container,
} ;
let docker_procedure: DockerProcedure = (container, injectable).into();
docker_procedure
.sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout)
.await
}
#[cfg(feature = "js_engine")]
PackageProcedure::Script(procedure) => {
procedure
@@ -200,7 +217,6 @@ impl PackageProcedure {
impl std::fmt::Display for PackageProcedure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PackageProcedure::DockerInject(_) => write!(f, "Docker Injectable")?,
PackageProcedure::Docker(_) => write!(f, "Docker")?,
#[cfg(feature = "js_engine")]
PackageProcedure::Script(_) => write!(f, "JS")?,
@@ -208,6 +224,7 @@ impl std::fmt::Display for PackageProcedure {
Ok(())
}
}
#[derive(Debug)]
pub struct NoOutput;
impl<'de> Deserialize<'de> for NoOutput {

View File

@@ -21,6 +21,7 @@ pub async fn properties(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Res
#[instrument(skip(ctx))]
pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result<Value, Error> {
let mut db = ctx.db.handle();
let manifest: Manifest = crate::db::DatabaseModel::new()
.package_data()
.idx_model(&id)
@@ -34,7 +35,6 @@ pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result<Value, E
props
.execute::<(), Value>(
&ctx,
&manifest.container,
&manifest.id,
&manifest.version,
ProcedureName::Properties,

View File

@@ -12,7 +12,7 @@ use crate::config::action::ConfigActions;
use crate::dependencies::Dependencies;
use crate::migration::Migrations;
use crate::net::interface::Interfaces;
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::PackageProcedure;
use crate::status::health_check::HealthChecks;
use crate::util::Version;
@@ -72,7 +72,7 @@ pub struct Manifest {
#[model]
pub dependencies: Dependencies,
#[model]
pub container: Option<DockerContainer>,
pub containers: Option<DockerContainers>,
}
impl Manifest {

View File

@@ -176,7 +176,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
}
let image_tags = self.image_tags().await?;
let man = self.manifest().await?;
let container = &man.container;
let containers = &man.containers;
let validated_image_ids = image_tags
.into_iter()
.map(|i| i.validate(&man.id, &man.version).map(|_| i.image_id))
@@ -187,7 +187,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
.iter()
.map(|(_, action)| {
action.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
@@ -195,21 +195,21 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
})
.collect::<Result<(), Error>>()?;
man.backup.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)?;
if let Some(cfg) = &man.config {
cfg.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
)?;
}
man.health_checks.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
@@ -217,7 +217,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
man.interfaces.validate()?;
man.main
.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
@@ -225,7 +225,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
)
.with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Main"))?;
man.migrations.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,
@@ -233,7 +233,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send + Sync> S9pkReader<R> {
if let Some(props) = &man.properties {
props
.validate(
container,
containers,
&man.eos_version,
&man.volumes,
&validated_image_ids,

View File

@@ -7,7 +7,7 @@ use tracing::instrument;
use crate::context::RpcContext;
use crate::id::ImageId;
use crate::procedure::docker::DockerContainer;
use crate::procedure::docker::DockerContainers;
use crate::procedure::{NoOutput, PackageProcedure, ProcedureName};
use crate::s9pk::manifest::PackageId;
use crate::util::serde::Duration;
@@ -21,7 +21,7 @@ impl HealthChecks {
#[instrument]
pub fn validate(
&self,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
eos_version: &Version,
volumes: &Volumes,
image_ids: &BTreeSet<ImageId>,
@@ -42,7 +42,7 @@ impl HealthChecks {
pub async fn check_all(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
started: DateTime<Utc>,
pkg_id: &PackageId,
pkg_version: &Version,
@@ -75,7 +75,7 @@ impl HealthCheck {
pub async fn check(
&self,
ctx: &RpcContext,
container: &Option<DockerContainer>,
container: &Option<DockerContainers>,
id: &HealthCheckId,
started: DateTime<Utc>,
pkg_id: &PackageId,
@@ -86,7 +86,6 @@ impl HealthCheck {
.implementation
.execute(
ctx,
container,
pkg_id,
pkg_version,
ProcedureName::Health(id.clone()),

View File

@@ -7,7 +7,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use color_eyre::eyre::eyre;
use futures::{FutureExt, Stream};
use futures::Stream;
use http::header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE};
use hyper::body::Bytes;
use pin_project::pin_project;
@@ -26,29 +26,29 @@ pub struct HttpReader {
read_in_progress: ReadInProgress,
}
type InProgress = Pin<
Box<
dyn Future<
Output = Result<
Pin<
Box<
dyn Stream<Item = Result<Bytes, reqwest::Error>>
+ Send
+ Sync
+ 'static,
>,
>,
Error,
>,
> + Send
+ Sync
+ 'static,
>,
>;
enum ReadInProgress {
None,
InProgress(
Pin<
Box<
dyn Future<
Output = Result<
Pin<
Box<
dyn Stream<Item = Result<Bytes, reqwest::Error>>
+ Send
+ Sync
+ 'static,
>,
>,
Error,
>,
> + Send
+ Sync
+ 'static,
>,
>,
),
InProgress(InProgress),
Complete(Pin<Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Send + Sync + 'static>>),
}
impl ReadInProgress {

View File

@@ -1,11 +1,6 @@
use std::path::Path;
use emver::VersionRange;
use tokio::process::Command;
use super::*;
use crate::disk::BOOT_RW_PATH;
use crate::util::Invoke;
const V0_3_0_1: emver::Version = emver::Version::new(0, 3, 0, 1);

View File

@@ -17,7 +17,7 @@ impl VersionT for Version {
fn compat(&self) -> &'static emver::VersionRange {
&*V0_3_0_COMPAT
}
async fn up<Db: DbHandle>(&self, db: &mut Db) -> Result<(), Error> {
async fn up<Db: DbHandle>(&self, _db: &mut Db) -> Result<(), Error> {
Ok(())
}
async fn down<Db: DbHandle>(&self, _db: &mut Db) -> Result<(), Error> {

View File

@@ -733,8 +733,28 @@ const assert = (condition, message) => {
throw new Error(message);
}
};
const ackermann = (m, n) => {
if (m === 0) {
return n+1
}
if (n === 0) {
return ackermann((m - 1), 1);
}
if (m !== 0 && n !== 0) {
return ackermann((m-1), ackermann(m, (n-1)))
}
}
export const action = {
async slow(effects, _input) {
while(true) {
effects.error("A");
// await ackermann(3,10);
await effects.sleep(100);
}
},
async fetch(effects, _input) {
const example = await effects.fetch(
"https://postman-echo.com/get?foo1=bar1&foo2=bar2"

View File

@@ -0,0 +1,8 @@
#!/bin/bash
for mozilladir in $(find /home -name ".mozilla"); do
for certDB in $(find ${mozilladir} -name "cert9.db"); do
certDir=$(dirname ${certDB});
certutil -A -n "Embassy Local Root CA" -t "TCu,Cuw,Tuw" -i /usr/local/share/ca-certificates/embassy-root-ca.crt -d ${certDir}
done
done

View File

@@ -12,4 +12,4 @@ ion-split-pane {
&_offline {
filter: saturate(0.75) contrast(0.85);
}
}
}

108
libs/Cargo.lock generated
View File

@@ -47,6 +47,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.58"
@@ -73,6 +82,27 @@ dependencies = [
"syn",
]
[[package]]
name = "async-stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.56"
@@ -414,6 +444,23 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be"
[[package]]
name = "embassy_container_init"
version = "0.1.0"
dependencies = [
"async-stream",
"color-eyre",
"futures",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"tracing",
"tracing-error 0.2.0",
"tracing-futures",
"tracing-subscriber 0.3.11",
]
[[package]]
name = "emver"
version = "0.1.6"
@@ -899,10 +946,12 @@ dependencies = [
name = "js_engine"
version = "0.1.0"
dependencies = [
"async-trait",
"dashmap",
"deno_ast",
"deno_core",
"dprint-swc-ext",
"embassy_container_init",
"helpers",
"models",
"reqwest",
@@ -1071,6 +1120,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@@ -1123,11 +1181,13 @@ dependencies = [
name = "models"
version = "0.1.0"
dependencies = [
"embassy_container_init",
"emver",
"patch-db",
"rand",
"serde",
"thiserror",
"tokio",
]
[[package]]
@@ -1573,6 +1633,15 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.27"
@@ -2421,6 +2490,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.3"
@@ -2503,6 +2583,27 @@ dependencies = [
"tracing-subscriber 0.3.11",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
@@ -2520,9 +2621,16 @@ version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"ansi_term",
"lazy_static",
"matchers",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]

View File

@@ -4,5 +4,6 @@ members = [
"snapshot-creator",
"models",
"js_engine",
"helpers"
"helpers",
"embassy-container-init",
]

View File

@@ -19,7 +19,7 @@ docker run --rm $USE_TTY -v "$HOME/.cargo/registry":/root/.cargo/registry -v "$(
cd -
echo "Creating Arm v8 Snapshot"
docker run --platform linux/arm64/v8 --mount type=bind,src=$(pwd),dst=/mnt arm64v8/ubuntu:20.04 /bin/sh -c "cd /mnt && /mnt/target/aarch64-unknown-linux-gnu/release/snapshot-creator"
docker run $USE_TTY --platform linux/arm64/v8 --mount type=bind,src=$(pwd),dst=/mnt arm64v8/ubuntu:20.04 /bin/sh -c "cd /mnt && /mnt/target/aarch64-unknown-linux-gnu/release/snapshot-creator"
sudo chown -R $USER target
sudo chown -R $USER ~/.cargo
sudo chown $USER JS_SNAPSHOT.bin

View File

@@ -0,0 +1,30 @@
[package]
name = "embassy_container_init"
version = "0.1.0"
edition = "2021"
[features]
dev = []
metal = []
sound = []
unstable = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-stream = "0.3.*"
color-eyre = "0.6.*"
futures = "0.3.*"
serde = { version = "1.*", features = ["derive", "rc"] }
serde_json = "1.*"
tokio = { version = "1.*", features = ["full"] }
tokio-stream = { version = "0.1.11" }
tracing = "0.1.*"
tracing-error = "0.2.*"
tracing-futures = "0.2.*"
tracing-subscriber = { version = "0.3.*", features = ["env-filter"] }
[profile.test]
opt-level = 3
[profile.dev.package.backtrace]
opt-level = 3

View File

@@ -0,0 +1,117 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;
/// The inputs that the executable is expecting
pub type InputJsonRpc = JsonRpc<Input>;
/// The outputs that the executable is expected to output
pub type OutputJsonRpc = JsonRpc<Output>;
/// Based on the jsonrpc spec, but we are limiting the rpc to a subset
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[serde(untagged)]
pub enum RpcId {
UInt(u32),
}
/// We use the JSON rpc as the format to share between the stdin and stdout for the executable.
/// Note: We are not allowing the id to not exist, used to ensure all pairs of messages are tracked
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct JsonRpc<T> {
id: RpcId,
#[serde(flatten)]
pub version_rpc: VersionRpc<T>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(tag = "jsonrpc", rename_all = "camelCase")]
pub enum VersionRpc<T> {
#[serde(rename = "2.0")]
Two(T),
}
impl<T> JsonRpc<T>
where
T: Serialize + for<'de> serde::Deserialize<'de> + std::fmt::Debug,
{
/// Using this to simplify creating this nested struct. Used for creating input mostly for executable stdin
pub fn new(id: RpcId, body: T) -> Self {
JsonRpc {
id,
version_rpc: VersionRpc::Two(body),
}
}
/// Use this to get the data out of the probably destructed output
pub fn into_pair(self) -> (RpcId, T) {
let Self { id, version_rpc } = self;
let VersionRpc::Two(body) = version_rpc;
(id, body)
}
/// Used during the execution.
#[instrument]
pub fn maybe_serialize(&self) -> Option<String> {
match serde_json::to_string(self) {
Ok(x) => Some(x),
Err(e) => {
tracing::warn!("Could not stringify and skipping");
tracing::debug!("{:?}", e);
None
}
}
}
/// Used during the execution
#[instrument]
pub fn maybe_parse(s: &str) -> Option<Self> {
match serde_json::from_str::<Self>(s) {
Ok(a) => Some(a),
Err(e) => {
tracing::warn!("Could not parse and skipping: {}", s);
tracing::debug!("{:?}", e);
None
}
}
}
}
/// Outputs embedded in the JSONRpc output of the executable.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
pub enum Output {
/// This is the line buffered output of the command
Line(String),
/// This is some kind of error with the program
Error(String),
/// Indication that the command is done
Done(Option<i32>),
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
pub enum Input {
/// Create a new command, with the args
Command { command: String, args: Vec<String> },
/// Send the sigkill to the process
Kill(),
/// Send the sigterm to the process
Term(),
}
#[test]
fn example_echo_line() {
let input = r#"{"id":0,"jsonrpc":"2.0","method":"command","params":{"command":"echo","args":["world I am here"]}}"#;
let new_input = JsonRpc::<Input>::maybe_parse(input);
assert!(new_input.is_some());
assert_eq!(input, &serde_json::to_string(&new_input.unwrap()).unwrap());
}
#[test]
fn example_input_line() {
let output = JsonRpc::new(RpcId::UInt(0), Output::Line("world I am here".to_string()));
let output_str = output.maybe_serialize();
assert!(output_str.is_some());
let output_str = output_str.unwrap();
assert_eq!(
&output_str,
r#"{"id":0,"jsonrpc":"2.0","method":"line","params":"world I am here"}"#
);
assert_eq!(output, serde_json::from_str(&output_str).unwrap());
}

View File

@@ -0,0 +1,296 @@
use std::{collections::BTreeMap, process::Stdio, sync::Arc};
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use tokio::{
io::AsyncBufReadExt,
process::Child,
select,
sync::{oneshot, Mutex},
};
use tokio::{io::BufReader, process::Command};
use tracing::instrument;
use embassy_container_init::{Input, InputJsonRpc, JsonRpc, Output, OutputJsonRpc, RpcId};
const MAX_COMMANDS: usize = 10;
enum DoneProgramStatus {
Wait(Result<std::process::ExitStatus, std::io::Error>),
Killed,
}
/// Created from the child and rpc, to prove that the cmd was the one who died
struct DoneProgram {
id: RpcId,
status: DoneProgramStatus,
}
/// Used to attach the running command with the rpc
struct ChildAndRpc {
id: RpcId,
child: Child,
}
impl ChildAndRpc {
fn new(id: RpcId, mut command: tokio::process::Command) -> ::std::io::Result<Self> {
Ok(Self {
id,
child: command.spawn()?,
})
}
async fn wait(&mut self) -> DoneProgram {
let status = DoneProgramStatus::Wait(self.child.wait().await);
DoneProgram {
id: self.id.clone(),
status,
}
}
async fn kill(mut self) -> DoneProgram {
if let Err(err) = self.child.kill().await {
let id = &self.id;
tracing::error!("Error while trying to kill a process {id:?}");
tracing::debug!("{err:?}");
}
DoneProgram {
id: self.id.clone(),
status: DoneProgramStatus::Killed,
}
}
}
/// Controlls the tracing + other io events
/// Can get the inputs from stdin
/// Can start a command from an intputrpc returning stream of outputs
/// Can output to stdout
#[derive(Debug, Clone)]
struct Io {
commands: Arc<Mutex<BTreeMap<RpcId, oneshot::Sender<()>>>>,
ids: Arc<Mutex<BTreeMap<RpcId, u32>>>,
}
impl Io {
fn start() -> Self {
use tracing_error::ErrorLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};
let filter_layer = EnvFilter::new("embassy_container_init=trace");
let fmt_layer = fmt::layer().with_target(true);
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.with(ErrorLayer::default())
.init();
color_eyre::install().unwrap();
Self {
commands: Default::default(),
ids: Default::default(),
}
}
#[instrument]
fn command(&self, input: InputJsonRpc) -> impl Stream<Item = OutputJsonRpc> {
let io = self.clone();
stream! {
let (id, command) = input.into_pair();
match command {
Input::Command {
ref command,
ref args,
} => {
let mut cmd = Command::new(command);
cmd.args(args);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child_and_rpc = match ChildAndRpc::new(id.clone(), cmd) {
Err(_e) => return,
Ok(a) => a,
};
if let Some(child_id) = child_and_rpc.child.id() {
io.ids.lock().await.insert(id.clone(), child_id);
}
let stdout = child_and_rpc.child
.stdout
.take()
.expect("child did not have a handle to stdout");
let stderr = child_and_rpc.child
.stderr
.take()
.expect("child did not have a handle to stderr");
let mut buff_out = BufReader::new(stdout).lines();
let mut buff_err = BufReader::new(stderr).lines();
let spawned = tokio::spawn({
let id = id.clone();
async move {
let end_command_receiver = io.create_end_command(id.clone()).await;
tokio::select!{
waited = child_and_rpc
.wait() => {
io.clean_id(&waited).await;
match &waited.status {
DoneProgramStatus::Wait(Ok(st)) => return st.code(),
DoneProgramStatus::Wait(Err(err)) => tracing::debug!("Child {id:?} got error: {err:?}"),
DoneProgramStatus::Killed => tracing::debug!("Child {id:?} already killed?"),
}
},
_ = end_command_receiver => {
let status = child_and_rpc.kill().await;
io.clean_id(&status).await;
},
}
None
}
});
while let Ok(Some(line)) = buff_out.next_line().await {
let output = Output::Line(line);
let output = JsonRpc::new(id.clone(), output);
tracing::trace!("OutputJsonRpc {{ id, output_rpc }} = {:?}", output);
yield output;
}
while let Ok(Some(line)) = buff_err.next_line().await {
yield JsonRpc::new(id.clone(), Output::Error(line));
}
let code = spawned.await.ok().flatten();
yield JsonRpc::new(id, Output::Done(code));
},
Input::Kill() => {
io.trigger_end_command(id).await;
}
Input::Term() => {
io.term_by_rpc(&id).await;
}
}
}
}
/// Used to get the string lines from the stdin
fn inputs(&self) -> impl Stream<Item = String> {
use std::io::BufRead;
let (sender, receiver) = tokio::sync::mpsc::channel(100);
tokio::task::spawn_blocking(move || {
let stdin = std::io::stdin();
for line in stdin.lock().lines().flatten() {
tracing::trace!("Line = {}", line);
sender.blocking_send(line).unwrap();
}
});
tokio_stream::wrappers::ReceiverStream::new(receiver)
}
///Convert a stream of string to stdout
async fn output(&self, outputs: impl Stream<Item = String>) {
pin_mut!(outputs);
while let Some(output) = outputs.next().await {
tracing::info!("{}", output);
println!("{}", output);
}
}
/// Helper for the command fn
/// Part of a pair for the signal map, that indicates that we should kill the command
async fn trigger_end_command(&self, id: RpcId) {
if let Some(command) = self.commands.lock().await.remove(&id) {
if command.send(()).is_err() {
tracing::trace!("Command {id:?} could not be ended, possible error or was done");
}
}
}
/// Helper for the command fn
/// Part of a pair for the signal map, that indicates that we should kill the command
async fn create_end_command(&self, id: RpcId) -> oneshot::Receiver<()> {
let (send, receiver) = oneshot::channel();
if let Some(other_command) = self.commands.lock().await.insert(id.clone(), send) {
if other_command.send(()).is_err() {
tracing::trace!(
"Found other command {id:?} could not be ended, possible error or was done"
);
}
}
receiver
}
/// Used during cleaning up a procress
async fn clean_id(
&self,
done_program: &DoneProgram,
) -> (Option<u32>, Option<oneshot::Sender<()>>) {
(
self.ids.lock().await.remove(&done_program.id),
self.commands.lock().await.remove(&done_program.id),
)
}
/// Given the rpcid, will try and term the running command
async fn term_by_rpc(&self, rpc: &RpcId) {
let output = match self.remove_cmd_id(rpc).await {
Some(id) => {
let mut cmd = tokio::process::Command::new("kill");
cmd.arg(format!("{id}"));
cmd.output().await
}
None => return,
};
match output {
Ok(_) => (),
Err(err) => {
tracing::error!("Could not kill rpc {rpc:?}");
tracing::debug!("{err}");
}
}
}
/// Used as a cleanup
async fn term_all(self) {
let ids: Vec<_> = self.ids.lock().await.keys().cloned().collect();
for id in ids {
self.term_by_rpc(&id).await;
}
}
async fn remove_cmd_id(&self, rpc: &RpcId) -> Option<u32> {
self.ids.lock().await.remove(rpc)
}
}
#[tokio::main]
async fn main() {
use futures::StreamExt;
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
let mut sighangup = signal(SignalKind::hangup()).unwrap();
let io = Io::start();
let outputs = io
.inputs()
.filter_map(|x| async move { InputJsonRpc::maybe_parse(&x) })
.flat_map_unordered(MAX_COMMANDS, |x| io.command(x).boxed())
.filter_map(|x| async move { x.maybe_serialize() });
select! {
_ = io.output(outputs) => {
tracing::debug!("Done with inputs/outputs")
},
_ = sigint.recv() => {
tracing::debug!("Sigint")
},
_ = sigterm.recv() => {
tracing::debug!("Sig Term")
},
_ = sigquit.recv() => {
tracing::debug!("Sigquit")
},
_ = sighangup.recv() => {
tracing::debug!("Sighangup")
}
}
io.term_all().await;
::std::process::exit(0);
}

View File

@@ -1,9 +1,10 @@
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;
use color_eyre::eyre::{eyre, Context, Error};
use futures::future::BoxFuture;
use futures::future::{pending, BoxFuture};
use futures::FutureExt;
use tokio::fs::File;
use tokio::sync::oneshot;
@@ -208,3 +209,80 @@ impl<T: 'static + Send> TimedResource<T> {
self.ready.is_closed()
}
}
type SingThreadTask<T> = futures::future::Select<
futures::future::Then<
oneshot::Receiver<T>,
futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>,
fn(
Result<T, oneshot::error::RecvError>,
)
-> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>,
>,
futures::future::Then<
JoinHandle<()>,
futures::future::Pending<T>,
fn(Result<(), JoinError>) -> futures::future::Pending<T>,
>,
>;
#[pin_project::pin_project(PinnedDrop)]
pub struct SingleThreadJoinHandle<T> {
abort: Option<oneshot::Sender<()>>,
#[pin]
task: SingThreadTask<T>,
}
impl<T: Send + 'static> SingleThreadJoinHandle<T> {
pub fn new<Fut: Future<Output = T>>(fut: impl FnOnce() -> Fut + Send + 'static) -> Self {
let (abort, abort_recv) = oneshot::channel();
let (return_val_send, return_val) = oneshot::channel();
fn unwrap_recv_or_pending<T>(
res: Result<T, oneshot::error::RecvError>,
) -> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>
{
match res {
Ok(a) => futures::future::Either::Left(futures::future::ready(a)),
_ => futures::future::Either::Right(pending()),
}
}
fn make_pending<T>(_: Result<(), JoinError>) -> futures::future::Pending<T> {
pending()
}
Self {
abort: Some(abort),
task: futures::future::select(
return_val.then(unwrap_recv_or_pending),
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
tokio::select! {
_ = abort_recv.fuse() => (),
res = fut().fuse() => {let _error = return_val_send.send(res);},
}
})
})
.then(make_pending),
),
}
}
}
impl<T: Send> Future for SingleThreadJoinHandle<T> {
type Output = T;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.task.poll(cx).map(|t| t.factor_first().0)
}
}
#[pin_project::pinned_drop]
impl<T> PinnedDrop for SingleThreadJoinHandle<T> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(abort) = this.abort.take() {
let _error = abort.send(());
}
}
}

View File

@@ -6,10 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.56"
dashmap = "5.3.4"
deno_core = "=0.136.0"
deno_ast = { version = "=0.15.0", features = ["transpiling"] }
dprint-swc-ext = "=0.1.1"
embassy_container_init = { path = "../embassy-container-init" }
reqwest = { version = "0.11.11" }
swc_atoms = "=0.2.11"
swc_common = "=0.18.7"

View File

@@ -37,10 +37,37 @@ const writeFile = (
const readFile = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
) => Deno.core.opAsync("read_file", volumeId, path);
const runDaemon = (
{ command = requireParam("command"), args = [] } = requireParam("options"),
) => {
let id = Deno.core.opAsync("start_command", command, args);
let waitPromise = null;
return {
async wait() {
waitPromise = waitPromise || Deno.core.opAsync("wait_command", await id)
return waitPromise
},
async term() {
return Deno.core.opAsync("term_command", await id)
}
}
};
const runCommand = async (
{ command = requireParam("command"), args = [], timeoutMillis = 30000 } = requireParam("options"),
) => {
let id = Deno.core.opAsync("start_command", command, args, timeoutMillis);
return Deno.core.opAsync("wait_command", await id)
};
const sleep = (timeMs = requireParam("timeMs"),
) => Deno.core.opAsync("sleep", timeMs);
const rename = (
{
srcVolume = requireParam("srcVolume"),
dstVolume = requireParam("dstVolume"),
dstVolume = requirePapram("dstVolume"),
srcPath = requireParam("srcPath"),
dstPath = requireParam("dstPath"),
} = requireParam("options"),
@@ -122,6 +149,9 @@ const effects = {
removeDir,
metadata,
rename,
runCommand,
sleep,
runDaemon
};
const runFunction = jsonPointerValue(mainModule, currentFunction);

View File

@@ -1,3 +1,5 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
@@ -9,11 +11,12 @@ use deno_core::{
resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture,
ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot,
};
use helpers::{script_dir, NonDetachingJoinHandle};
use models::{PackageId, ProcedureName, Version, VolumeId};
use helpers::{script_dir, SingleThreadJoinHandle};
use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
pub trait PathForVolumeId: Send + Sync {
fn path_for(
@@ -80,6 +83,8 @@ const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/JS_SNAPSHOT.bin");
#[cfg(target_arch = "aarch64")]
const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/ARM_JS_SNAPSHOT.bin");
type WaitFns = Arc<Mutex<BTreeMap<u32, Pin<Box<dyn Future<Output = ResultType>>>>>>;
#[derive(Clone)]
struct JsContext {
sandboxed: bool,
@@ -90,8 +95,17 @@ struct JsContext {
volumes: Arc<dyn PathForVolumeId>,
input: Value,
variable_args: Vec<serde_json::Value>,
command_inserter: ExecCommand,
term_command: TermCommand,
wait_fns: WaitFns,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
enum ResultType {
Error(String),
ErrorCode(i32, String),
Result(serde_json::Value),
}
#[derive(Clone, Default)]
struct AnswerState(std::sync::Arc<deno_core::parking_lot::Mutex<Value>>);
@@ -162,6 +176,7 @@ impl ModuleLoader for ModsLoader {
})
}
}
pub struct JsExecutionEnvironment {
sandboxed: bool,
base_directory: PathBuf,
@@ -169,6 +184,8 @@ pub struct JsExecutionEnvironment {
package_id: PackageId,
version: Version,
volumes: Arc<dyn PathForVolumeId>,
command_inserter: ExecCommand,
term_command: TermCommand,
}
impl JsExecutionEnvironment {
@@ -177,7 +194,9 @@ impl JsExecutionEnvironment {
package_id: &PackageId,
version: &Version,
volumes: Box<dyn PathForVolumeId>,
) -> Result<Self, (JsError, String)> {
command_inserter: ExecCommand,
term_command: TermCommand,
) -> Result<JsExecutionEnvironment, (JsError, String)> {
let data_dir = data_directory.as_ref();
let base_directory = data_dir;
let js_code = JsCode({
@@ -203,13 +222,15 @@ impl JsExecutionEnvironment {
};
buffer
});
Ok(Self {
Ok(JsExecutionEnvironment {
base_directory: base_directory.to_owned(),
module_loader: ModsLoader { code: js_code },
package_id: package_id.clone(),
version: version.clone(),
volumes: volumes.into(),
sandboxed: false,
command_inserter,
term_command,
})
}
pub fn read_only_effects(mut self) -> Self {
@@ -234,12 +255,9 @@ impl JsExecutionEnvironment {
));
}
};
let safer_handle: NonDetachingJoinHandle<_> =
tokio::task::spawn_blocking(move || self.execute(procedure_name, input, variable_args))
.into();
let output = safer_handle
.await
.map_err(|err| (JsError::Tokio, format!("Tokio gave us the error: {}", err)))??;
let safer_handle =
SingleThreadJoinHandle::new(move || self.execute(procedure_name, input, variable_args));
let output = safer_handle.await?;
match serde_json::from_value(output.clone()) {
Ok(x) => Ok(x),
Err(err) => {
@@ -275,11 +293,15 @@ impl JsExecutionEnvironment {
fns::get_variable_args::decl(),
fns::set_value::decl(),
fns::is_sandboxed::decl(),
fns::start_command::decl(),
fns::wait_command::decl(),
fns::sleep::decl(),
fns::term_command::decl(),
]
}
fn execute(
&self,
async fn execute(
self,
procedure_name: ProcedureName,
input: Value,
variable_args: Vec<serde_json::Value>,
@@ -304,6 +326,9 @@ impl JsExecutionEnvironment {
sandboxed: self.sandboxed,
input,
variable_args,
command_inserter: self.command_inserter.clone(),
term_command: self.term_command.clone(),
wait_fns: Default::default(),
};
let ext = Extension::builder()
.ops(Self::declarations())
@@ -321,25 +346,25 @@ impl JsExecutionEnvironment {
startup_snapshot: Some(Snapshot::Static(SNAPSHOT_BYTES)),
..Default::default()
};
let mut runtime = JsRuntime::new(runtime_options);
let runtime = Arc::new(Mutex::new(JsRuntime::new(runtime_options)));
let future = async move {
let mod_id = runtime
.lock()
.await
.load_main_module(&"file:///loadModule.js".parse().unwrap(), None)
.await?;
let evaluated = runtime.mod_evaluate(mod_id);
let res = runtime.run_event_loop(false).await;
let evaluated = runtime.lock().await.mod_evaluate(mod_id);
let res = runtime.lock().await.run_event_loop(false).await;
res?;
evaluated.await??;
Ok::<_, AnyError>(())
};
tokio::runtime::Handle::current()
.block_on(future)
.map_err(|e| {
tracing::debug!("{:?}", e);
(JsError::Javascript, format!("{}", e))
})?;
future.await.map_err(|e| {
tracing::debug!("{:?}", e);
(JsError::Javascript, format!("{}", e))
})?;
let answer = answer_state.0.lock().clone();
Ok(answer)
@@ -348,23 +373,24 @@ impl JsExecutionEnvironment {
/// Note: Make sure that we have the assumption that all these methods are callable at any time, and all call restrictions should be in rust
mod fns {
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::{cell::RefCell, time::Duration};
use deno_core::anyhow::{anyhow, bail};
use deno_core::error::AnyError;
use deno_core::*;
use embassy_container_init::RpcId;
use helpers::{to_tmp_path, AtomicFile};
use models::VolumeId;
use models::{TermCommand, VolumeId};
use serde_json::Value;
use tokio::io::AsyncWriteExt;
use super::{AnswerState, JsContext};
use crate::{system_time_as_unix_ms, MetadataJs};
use crate::{system_time_as_unix_ms, MetadataJs, ResultType};
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Default)]
struct FetchOptions {
@@ -386,10 +412,13 @@ mod fns {
url: url::Url,
options: Option<FetchOptions>,
) -> Result<FetchResponse, AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let sandboxed = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
ctx.sandboxed
};
if ctx.sandboxed {
if sandboxed {
bail!("Will not run fetch in sandboxed mode");
}
@@ -432,7 +461,7 @@ mod fns {
body: response.text().await.ok(),
};
return Ok(fetch_response);
Ok(fetch_response)
}
#[op]
@@ -441,12 +470,13 @@ mod fns {
volume_id: VolumeId,
path_in: PathBuf,
) -> Result<String, AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
let volume_path = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
ctx.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?
};
//get_path_for in volume.rs
let new_file = volume_path.join(path_in);
if !is_subset(&volume_path, &new_file).await? {
@@ -465,12 +495,13 @@ mod fns {
volume_id: VolumeId,
path_in: PathBuf,
) -> Result<MetadataJs, AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
let volume_path = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
ctx.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?
};
//get_path_for in volume.rs
let new_file = volume_path.join(path_in);
if !is_subset(&volume_path, &new_file).await? {
@@ -517,13 +548,16 @@ mod fns {
path_in: PathBuf,
write: String,
) -> Result<(), AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
if ctx.volumes.readonly(&volume_id) {
let (volumes, volume_path) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
(ctx.volumes.clone(), volume_path)
};
if volumes.readonly(&volume_id) {
bail!("Volume {} is readonly", volume_id);
}
@@ -566,17 +600,20 @@ mod fns {
dst_volume: VolumeId,
dst_path: PathBuf,
) -> Result<(), AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &src_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", src_volume))?;
let volume_path_out = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &dst_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", dst_volume))?;
if ctx.volumes.readonly(&dst_volume) {
let (volumes, volume_path, volume_path_out) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &src_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", src_volume))?;
let volume_path_out = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &dst_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", dst_volume))?;
(ctx.volumes.clone(), volume_path, volume_path_out)
};
if volumes.readonly(&dst_volume) {
bail!("Volume {} is readonly", dst_volume);
}
@@ -614,13 +651,16 @@ mod fns {
volume_id: VolumeId,
path_in: PathBuf,
) -> Result<(), AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
if ctx.volumes.readonly(&volume_id) {
let (volumes, volume_path) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
(ctx.volumes.clone(), volume_path)
};
if volumes.readonly(&volume_id) {
bail!("Volume {} is readonly", volume_id);
}
let new_file = volume_path.join(path_in);
@@ -641,13 +681,16 @@ mod fns {
volume_id: VolumeId,
path_in: PathBuf,
) -> Result<(), AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
if ctx.volumes.readonly(&volume_id) {
let (volumes, volume_path) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
(ctx.volumes.clone(), volume_path)
};
if volumes.readonly(&volume_id) {
bail!("Volume {} is readonly", volume_id);
}
let new_file = volume_path.join(path_in);
@@ -668,13 +711,16 @@ mod fns {
volume_id: VolumeId,
path_in: PathBuf,
) -> Result<(), AnyError> {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
if ctx.volumes.readonly(&volume_id) {
let (volumes, volume_path) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
(ctx.volumes.clone(), volume_path)
};
if volumes.readonly(&volume_id) {
bail!("Volume {} is readonly", volume_id);
}
let new_file = volume_path.join(path_in);
@@ -777,6 +823,102 @@ mod fns {
Ok(ctx.sandboxed)
}
#[op]
async fn term_command(state: Rc<RefCell<OpState>>, id: u32) -> Result<(), AnyError> {
let term_command_impl: TermCommand = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
ctx.term_command.clone()
};
if let Err(err) = term_command_impl(embassy_container_init::RpcId::UInt(id)).await {
bail!("{}", err);
}
Ok(())
}
#[op]
async fn start_command(
state: Rc<RefCell<OpState>>,
command: String,
args: Vec<String>,
timeout: Option<u64>,
) -> Result<u32, AnyError> {
use embassy_container_init::Output;
let (command_inserter, wait_fns) = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
(ctx.command_inserter.clone(), ctx.wait_fns.clone())
};
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<Output>();
let id = match command_inserter(
command,
args.into_iter().collect(),
sender,
timeout.map(std::time::Duration::from_millis),
)
.await
{
Err(err) => bail!(err),
Ok(RpcId::UInt(a)) => a,
};
let wait = async move {
let mut answer = String::new();
let mut command_error = String::new();
let mut status: Option<i32> = None;
while let Some(output) = receiver.recv().await {
match output {
Output::Line(value) => {
answer.push_str(&value);
answer.push('\n');
}
Output::Error(error) => {
command_error.push_str(&error);
command_error.push('\n');
}
Output::Done(error_code) => {
status = error_code;
break;
}
}
}
if !command_error.is_empty() {
if let Some(status) = status {
return ResultType::ErrorCode(status, command_error);
}
return ResultType::Error(command_error);
}
ResultType::Result(serde_json::Value::String(answer))
};
wait_fns.lock().await.insert(id, Box::pin(wait));
Ok(id)
}
#[op]
async fn wait_command(state: Rc<RefCell<OpState>>, id: u32) -> Result<ResultType, AnyError> {
let wait_fns = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
ctx.wait_fns.clone()
};
let found_future = match wait_fns.lock().await.remove(&id) {
Some(a) => a,
None => bail!("No future for id {id}, could have been removed already"),
};
Ok(found_future.await)
}
#[op]
async fn sleep(time_ms: u64) -> Result<(), AnyError> {
tokio::time::sleep(Duration::from_millis(time_ms)).await;
Ok(())
}
/// We need to make sure that during the file accessing, we don't reach beyond our scope of control
async fn is_subset(
parent: impl AsRef<Path>,

View File

@@ -6,10 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
embassy_container_init = { path = "../embassy-container-init" }
emver = { version = "0.1", features = ["serde"] }
patch-db = { version = "*", path = "../../patch-db/patch-db", features = [
"trace",
] }
serde = { version = "1.0", features = ["derive", "rc"] }
thiserror = "1.0"
emver = { version = "0.1", features = ["serde"] }
rand = "0.8"
tokio = { version = "1", features = ["full"] }
thiserror = "1.0"

View File

@@ -6,6 +6,7 @@ mod interface_id;
mod invalid_id;
mod package_id;
mod procedure_name;
mod type_aliases;
mod version;
mod volume_id;
@@ -17,5 +18,6 @@ pub use interface_id::*;
pub use invalid_id::*;
pub use package_id::*;
pub use procedure_name::*;
pub use type_aliases::*;
pub use version::*;
pub use volume_id::*;

View File

@@ -35,7 +35,7 @@ impl ProcedureName {
}
pub fn js_function_name(&self) -> Option<String> {
match self {
ProcedureName::Main => None,
ProcedureName::Main => Some("/main".to_string()),
ProcedureName::LongRunning => None,
ProcedureName::CreateBackup => Some("/createBackup".to_string()),
ProcedureName::RestoreBackup => Some("/restoreBackup".to_string()),

View File

@@ -0,0 +1,25 @@
use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use embassy_container_init::RpcId;
use tokio::sync::mpsc::UnboundedSender;
/// Used by the js-executor, it is the ability to just create a command in an already running exec
pub type ExecCommand = Arc<
dyn Fn(
String,
Vec<String>,
UnboundedSender<embassy_container_init::Output>,
Option<Duration>,
) -> Pin<Box<dyn Future<Output = Result<RpcId, String>> + 'static>>
+ Send
+ Sync
+ 'static,
>;
/// Used by the js-executor, it is the ability to just create a command in an already running exec
pub type TermCommand = Arc<
dyn Fn(RpcId) -> Pin<Box<dyn Future<Output = Result<(), String>> + 'static>>
+ Send
+ Sync
+ 'static,
>;

File diff suppressed because it is too large Load Diff