feat: Exposing the rsync that we have to the js (#1907)

chore: Make the commit do by checksum.

chore: Remove the logging at the start

chore: use the defaults of the original.

chore: Convert the error into just the source.

chore: Remove some of the unwraps
This commit is contained in:
J M
2022-11-09 12:19:08 -07:00
committed by Aiden McClelland
parent 0e82b6981f
commit 67b54ac1eb
32 changed files with 2001 additions and 439 deletions

View File

@@ -44,14 +44,18 @@ const runDaemon = (
{ command = requireParam("command"), args = [] } = requireParam("options"),
) => {
let id = Deno.core.opAsync("start_command", command, args);
let rpcId = id.then(x => x.rpcId)
let processId = id.then(x => x.processId)
let waitPromise = null;
return {
processId,
rpcId,
async wait() {
waitPromise = waitPromise || Deno.core.opAsync("wait_command", await id)
waitPromise = waitPromise || Deno.core.opAsync("wait_command", await rpcId)
return waitPromise
},
async term() {
return Deno.core.opAsync("term_command", await id)
return Deno.core.opAsync("term_command", await rpcId)
}
}
};
@@ -128,6 +132,31 @@ const fetch = async (url = requireParam ('url'), options = null) => {
};
};
const runRsync = (
{
srcVolume = requireParam("srcVolume"),
dstVolume = requireParam("dstVolume"),
srcPath = requireParam("srcPath"),
dstPath = requireParam("dstPath"),
options = requireParam("options"),
} = requireParam("options"),
) => {
let id = Deno.core.opAsync("rsync", srcVolume, srcPath, dstVolume, dstPath, options);
let waitPromise = null;
return {
async id() {
return id
},
async wait() {
waitPromise = waitPromise || Deno.core.opAsync("rsync_wait", await id)
return waitPromise
},
async progress() {
return Deno.core.opAsync("rsync_progress", await id)
}
}
};
const currentFunction = Deno.core.opSync("current_function");
const input = Deno.core.opSync("get_input");
const variable_args = Deno.core.opSync("get_variable_args");
@@ -151,7 +180,8 @@ const effects = {
rename,
runCommand,
sleep,
runDaemon
runDaemon,
runRsync
};
const runFunction = jsonPointerValue(mainModule, currentFunction);

View File

@@ -11,7 +11,8 @@ use deno_core::{
resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture,
ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot,
};
use helpers::{script_dir, spawn_local};
use embassy_container_init::RpcId;
use helpers::{script_dir, spawn_local, Rsync};
use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -83,7 +84,7 @@ const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/JS_SNAPSHOT.bin");
#[cfg(target_arch = "aarch64")]
const SNAPSHOT_BYTES: &[u8] = include_bytes!("./artifacts/ARM_JS_SNAPSHOT.bin");
type WaitFns = Arc<Mutex<BTreeMap<u32, Pin<Box<dyn Future<Output = ResultType>>>>>>;
type WaitFns = Arc<Mutex<BTreeMap<RpcId, Pin<Box<dyn Future<Output = ResultType>>>>>>;
#[derive(Clone)]
struct JsContext {
@@ -98,6 +99,7 @@ struct JsContext {
command_inserter: ExecCommand,
term_command: TermCommand,
wait_fns: WaitFns,
rsyncs: Arc<Mutex<(usize, BTreeMap<usize, Rsync>)>>,
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
@@ -262,13 +264,13 @@ impl JsExecutionEnvironment {
Err(err) => {
tracing::error!("{}", err);
tracing::debug!("{:?}", err);
return Err((
Err((
JsError::BoundryLayerSerDe,
format!(
"Couldn't convert output = {:#?} to the correct type",
serde_json::to_string_pretty(&output).unwrap_or_default()
),
));
))
}
}
}
@@ -296,6 +298,9 @@ impl JsExecutionEnvironment {
fns::wait_command::decl(),
fns::sleep::decl(),
fns::term_command::decl(),
fns::rsync::decl(),
fns::rsync_wait::decl(),
fns::rsync_progress::decl(),
]
}
@@ -328,6 +333,7 @@ impl JsExecutionEnvironment {
command_inserter: self.command_inserter.clone(),
term_command: self.term_command.clone(),
wait_fns: Default::default(),
rsyncs: Default::default(),
};
let ext = Extension::builder()
.ops(Self::declarations())
@@ -382,9 +388,10 @@ mod fns {
use deno_core::anyhow::{anyhow, bail};
use deno_core::error::AnyError;
use deno_core::*;
use embassy_container_init::RpcId;
use helpers::{to_tmp_path, AtomicFile};
use embassy_container_init::{ProcessId, RpcId};
use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions};
use models::{TermCommand, VolumeId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncWriteExt;
@@ -644,6 +651,106 @@ mod fns {
tokio::fs::rename(old_file, new_file).await?;
Ok(())
}
#[op]
async fn rsync(
state: Rc<RefCell<OpState>>,
src_volume: VolumeId,
src_path: PathBuf,
dst_volume: VolumeId,
dst_path: PathBuf,
options: RsyncOptions,
) -> Result<usize, AnyError> {
let (volumes, volume_path, volume_path_out, rsyncs) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &src_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", src_volume))?;
let volume_path_out = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &dst_volume)
.ok_or_else(|| anyhow!("There is no {} in volumes", dst_volume))?;
(
ctx.volumes.clone(),
volume_path,
volume_path_out,
ctx.rsyncs.clone(),
)
};
if volumes.readonly(&dst_volume) {
bail!("Volume {} is readonly", dst_volume);
}
let src = volume_path.join(src_path);
// With the volume check
if !is_subset(&volume_path, &src).await? {
bail!(
"Path '{}' has broken away from parent '{}'",
src.to_string_lossy(),
volume_path.to_string_lossy(),
);
}
if let Err(_) = tokio::fs::metadata(&src).await {
bail!("Source at {} does not exists", src.to_string_lossy());
}
let dst = volume_path_out.join(dst_path);
// With the volume check
if !is_subset(&volume_path_out, &dst).await? {
bail!(
"Path '{}' has broken away from parent '{}'",
dst.to_string_lossy(),
volume_path_out.to_string_lossy(),
);
}
let running_rsync =
Rsync::new(src, dst, options).map_err(|e| anyhow::anyhow!("{:?}", e.source))?;
let insert_id = {
let mut rsyncs = rsyncs.lock().await;
let next = rsyncs.0 + 1;
rsyncs.0 = next;
rsyncs.1.insert(next, running_rsync);
next
};
Ok(insert_id)
}
#[op]
async fn rsync_wait(state: Rc<RefCell<OpState>>, id: usize) -> Result<(), AnyError> {
let rsyncs = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
ctx.rsyncs.clone()
};
let running_rsync = match rsyncs.lock().await.1.remove(&id) {
Some(a) => a,
None => bail!("Couldn't find rsync at id {id}"),
};
running_rsync
.wait()
.await
.map_err(|x| anyhow::anyhow!("{}", x.source))?;
Ok(())
}
#[op]
async fn rsync_progress(state: Rc<RefCell<OpState>>, id: usize) -> Result<f64, AnyError> {
use futures::StreamExt;
let rsyncs = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
ctx.rsyncs.clone()
};
let mut running_rsync = match rsyncs.lock().await.1.remove(&id) {
Some(a) => a,
None => bail!("Couldn't find rsync at id {id}"),
};
let progress = running_rsync.progress.next().await.unwrap_or_default();
rsyncs.lock().await.1.insert(id, running_rsync);
Ok(progress)
}
#[op]
async fn remove_file(
state: Rc<RefCell<OpState>>,
@@ -835,13 +942,20 @@ mod fns {
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StartCommand {
rpc_id: RpcId,
process_id: ProcessId,
}
#[op]
async fn start_command(
state: Rc<RefCell<OpState>>,
command: String,
args: Vec<String>,
timeout: Option<u64>,
) -> Result<u32, AnyError> {
) -> Result<StartCommand, AnyError> {
use embassy_container_init::Output;
let (command_inserter, wait_fns) = {
let state = state.borrow();
@@ -850,7 +964,7 @@ mod fns {
};
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<Output>();
let id = match command_inserter(
let rpc_id = match command_inserter(
command,
args.into_iter().collect(),
sender,
@@ -859,15 +973,28 @@ mod fns {
.await
{
Err(err) => bail!(err),
Ok(RpcId::UInt(a)) => a,
Ok(rpc_id) => rpc_id,
};
let (process_id_send, process_id_recv) = tokio::sync::oneshot::channel::<ProcessId>();
let wait = async move {
let mut answer = String::new();
let mut command_error = String::new();
let mut status: Option<i32> = None;
let mut process_id_send = Some(process_id_send);
while let Some(output) = receiver.recv().await {
match output {
Output::ProcessId(process_id) => {
if let Some(process_id_send) = process_id_send.take() {
if let Err(err) = process_id_send.send(process_id) {
tracing::error!(
"Could not get a process id {process_id:?} sent for {rpc_id:?}"
);
tracing::debug!("{err:?}");
}
}
}
Output::Line(value) => {
answer.push_str(&value);
answer.push('\n');
@@ -892,12 +1019,13 @@ mod fns {
ResultType::Result(serde_json::Value::String(answer))
};
wait_fns.lock().await.insert(id, Box::pin(wait));
Ok(id)
wait_fns.lock().await.insert(rpc_id, Box::pin(wait));
let process_id = process_id_recv.await?;
Ok(StartCommand { rpc_id, process_id })
}
#[op]
async fn wait_command(state: Rc<RefCell<OpState>>, id: u32) -> Result<ResultType, AnyError> {
async fn wait_command(state: Rc<RefCell<OpState>>, id: RpcId) -> Result<ResultType, AnyError> {
let wait_fns = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
@@ -906,7 +1034,7 @@ mod fns {
let found_future = match wait_fns.lock().await.remove(&id) {
Some(a) => a,
None => bail!("No future for id {id}, could have been removed already"),
None => bail!("No future for id {id:?}, could have been removed already"),
};
Ok(found_future.await)