wip: Starting down the bind for the effects

todo: complete a ip todo

chore: Fix the result type on something

todo: Address returning

chore: JS with callbacks

chore: Add in the chown and permissions

chore: Add in the binds and unbinds in
This commit is contained in:
BluJ
2023-02-06 12:14:48 -07:00
committed by Aiden McClelland
parent 550b17552b
commit 9366dbb96e
10 changed files with 1237 additions and 460 deletions

View File

@@ -1,6 +1,7 @@
use color_eyre::eyre::eyre;
use models::Error;
use color_eyre::Report;
use models::PackageId;
use models::{Error, InterfaceId};
use serde_json::Value;
pub struct RuntimeDropped;
@@ -13,6 +14,28 @@ fn method_not_available() -> Error {
models::ErrorKind::InvalidRequest,
)
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct AddressSchemaOnion {
pub id: InterfaceId,
pub external_port: u16,
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct AddressSchemaLocal {
pub id: InterfaceId,
pub external_port: u16,
}
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Address(pub String);
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Domain;
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Name;
#[async_trait::async_trait]
#[allow(unused_variables)]
@@ -22,7 +45,47 @@ pub trait OsApi: Send + Sync + 'static {
id: PackageId,
path: &str,
callback: Callback,
) -> Result<Value, Error> {
Err(method_not_available())
) -> Result<Value, Report>;
async fn bind_local(
&self,
internal_port: u16,
address_schema: AddressSchemaLocal,
) -> Result<Address, Report>;
async fn bind_onion(
&self,
internal_port: u16,
address_schema: AddressSchemaOnion,
) -> Result<Address, Report>;
async fn unbind_local(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
todo!()
}
async fn unbind_onion(&self, id: InterfaceId, external: u16) -> Result<(), Report> {
todo!()
}
async fn list_address(&self) -> Result<Vec<Address>, Report> {
todo!()
}
async fn list_domains(&self) -> Result<Vec<Domain>, Report> {
todo!()
}
async fn alloc_onion(&self, id: String) -> Result<Name, Report> {
todo!()
}
async fn dealloc_onion(&self, id: String) -> Result<(), Report> {
todo!()
}
async fn alloc_local(&self, id: String) -> Result<Name, Report> {
todo!()
}
async fn dealloc_local(&self, id: String) -> Result<(), Report> {
todo!()
}
async fn alloc_forward(&self, id: String) -> Result<u16, Report> {
todo!()
}
async fn dealloc_forward(&self, id: String) -> Result<(), Report> {
todo!()
}
}

View File

@@ -70,6 +70,9 @@ impl Rsync {
for exclude in options.exclude {
cmd.arg(format!("--exclude={}", exclude));
}
if options.no_permissions {
cmd.arg("--no-perms");
}
let mut command = cmd
.arg("-acAXH")
.arg("--info=progress2")

View File

@@ -1,10 +1,24 @@
import Deno from "/deno_global.js";
import * as mainModule from "/embassy.js";
// throw new Error("I'm going crasy")
function requireParam(param) {
throw new Error(`Missing required parameter ${param}`);
}
const callbackName = (() => {
let count = 0;
return () => `callback${count++}${Math.floor(Math.random() * 100000)}`;
})();
const callbackMapping = {};
const registerCallback = (fn) => {
const uuid = callbackName(); // TODO
callbackMapping[uuid] = fn;
return uuid;
};
/**
* This is using the simplified json pointer spec, using no escapes and arrays
* @param {object} obj
@@ -35,42 +49,75 @@ const writeFile = (
) => Deno.core.opAsync("write_file", volumeId, path, toWrite);
const readFile = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => Deno.core.opAsync("read_file", volumeId, path);
const runDaemon = (
{ command = requireParam("command"), args = [] } = requireParam("options"),
) => {
let id = Deno.core.opAsync("start_command", command, args, "inherit", null);
let processId = id.then(x => x.processId)
let processId = id.then((x) => x.processId);
let waitPromise = null;
return {
processId,
async wait() {
waitPromise = waitPromise || Deno.core.opAsync("wait_command", await processId)
return waitPromise
waitPromise = waitPromise ||
Deno.core.opAsync("wait_command", await processId);
return waitPromise;
},
async term(signal = 15) {
return Deno.core.opAsync("send_signal", await processId, 15)
}
}
return Deno.core.opAsync("send_signal", await processId, 15);
},
};
};
const runCommand = async (
{ command = requireParam("command"), args = [], timeoutMillis = 30000 } = requireParam("options"),
{
command = requireParam("command"),
args = [],
timeoutMillis = 30000,
} = requireParam("options"),
) => {
let id = Deno.core.opAsync("start_command", command, args, "collect", timeoutMillis);
let pid = id.then(x => x.processId)
return Deno.core.opAsync("wait_command", await pid)
let id = Deno.core.opAsync(
"start_command",
command,
args,
"collect",
timeoutMillis,
);
let pid = id.then((x) => x.processId);
return Deno.core.opAsync("wait_command", await pid);
};
const bindLocal = async (
{
internalPort = requireParam("internalPort"),
name = requireParam("name"),
externalPort = requireParam("externalPort"),
} = requireParam("options"),
) => {
return Deno.core.opAsync("bind_local", internalPort, { name, externalPort });
};
const bindTor = async (
{
internalPort = requireParam("internalPort"),
name = requireParam("name"),
externalPort = requireParam("externalPort"),
} = requireParam("options"),
) => {
return Deno.core.opAsync("bind_onion", internalPort, { name, externalPort });
};
const signalGroup = async (
{ gid = requireParam("gid"), signal = requireParam("signal") } = requireParam("gid and signal")
{ gid = requireParam("gid"), signal = requireParam("signal") } = requireParam(
"gid and signal",
),
) => {
return Deno.core.opAsync("signal_group", gid, signal);
};
const sleep = (timeMs = requireParam("timeMs"),
) => Deno.core.opAsync("sleep", timeMs);
const sleep = (timeMs = requireParam("timeMs")) =>
Deno.core.opAsync("sleep", timeMs);
const rename = (
{
@@ -81,7 +128,10 @@ const rename = (
} = requireParam("options"),
) => Deno.core.opAsync("rename", srcVolume, srcPath, dstVolume, dstPath);
const metadata = async (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => {
const data = await Deno.core.opAsync("metadata", volumeId, path);
return {
@@ -92,7 +142,10 @@ const metadata = async (
};
};
const removeFile = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => Deno.core.opAsync("remove_file", volumeId, path);
const isSandboxed = () => Deno.core.opSync("is_sandboxed");
@@ -129,24 +182,38 @@ const chmod = async (
return await Deno.core.opAsync("chmod", volumeId, path, mode);
};
const readJsonFile = async (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => JSON.parse(await readFile({ volumeId, path }));
const createDir = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => Deno.core.opAsync("create_dir", volumeId, path);
const readDir = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
) => Deno.core.opAsync("read_dir", volumeId, path);
const removeDir = (
{ volumeId = requireParam("volumeId"), path = requireParam("path") } = requireParam("options"),
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
} = requireParam("options"),
) => Deno.core.opAsync("remove_dir", volumeId, path);
const trace = (whatToTrace = requireParam('whatToTrace')) => Deno.core.opAsync("log_trace", whatToTrace);
const warn = (whatToTrace = requireParam('whatToTrace')) => Deno.core.opAsync("log_warn", whatToTrace);
const error = (whatToTrace = requireParam('whatToTrace')) => Deno.core.opAsync("log_error", whatToTrace);
const debug = (whatToTrace = requireParam('whatToTrace')) => Deno.core.opAsync("log_debug", whatToTrace);
const info = (whatToTrace = requireParam('whatToTrace')) => Deno.core.opAsync("log_info", whatToTrace);
const fetch = async (url = requireParam ('url'), options = null) => {
const trace = (whatToTrace = requireParam("whatToTrace")) =>
Deno.core.opAsync("log_trace", whatToTrace);
const warn = (whatToTrace = requireParam("whatToTrace")) =>
Deno.core.opAsync("log_warn", whatToTrace);
const error = (whatToTrace = requireParam("whatToTrace")) =>
Deno.core.opAsync("log_error", whatToTrace);
const debug = (whatToTrace = requireParam("whatToTrace")) =>
Deno.core.opAsync("log_debug", whatToTrace);
const info = (whatToTrace = requireParam("whatToTrace")) =>
Deno.core.opAsync("log_info", whatToTrace);
const fetch = async (url = requireParam("url"), options = null) => {
const { body, ...response } = await Deno.core.opAsync("fetch", url, options);
const textValue = Promise.resolve(body);
return {
@@ -161,28 +228,35 @@ const fetch = async (url = requireParam ('url'), options = null) => {
};
const runRsync = (
{
{
srcVolume = requireParam("srcVolume"),
dstVolume = requireParam("dstVolume"),
srcPath = requireParam("srcPath"),
dstPath = requireParam("dstPath"),
options = requireParam("options"),
} = requireParam("options"),
} = requireParam("options"),
) => {
let id = Deno.core.opAsync("rsync", srcVolume, srcPath, dstVolume, dstPath, options);
let id = Deno.core.opAsync(
"rsync",
srcVolume,
srcPath,
dstVolume,
dstPath,
options,
);
let waitPromise = null;
return {
async id() {
return id
return id;
},
async wait() {
waitPromise = waitPromise || Deno.core.opAsync("rsync_wait", await id)
return waitPromise
waitPromise = waitPromise || Deno.core.opAsync("rsync_wait", await id);
return waitPromise;
},
async progress() {
return Deno.core.opAsync("rsync_progress", await id)
}
}
return Deno.core.opAsync("rsync_progress", await id);
},
};
};
const diskUsage = async ({
@@ -193,63 +267,105 @@ const diskUsage = async ({
return { used, total }
}
const callbackMapping = {}
const registerCallback = (fn) => {
const uuid = generateUuid(); // TODO
callbackMapping[uuid] = fn;
return uuid
}
const runCallback = (uuid, data) => callbackMapping[uuid](data)
globalThis.runCallback = (uuid, data) => callbackMapping[uuid](data);
// window.runCallback = runCallback;
// Deno.runCallback = runCallback;
const getServiceConfig = async (serviceId, configPath, onChange) => {
await Deno.core.opAsync("get_service_config", serviceId, configPath, registerCallback(onChange))
}
const getServiceConfig = async (
{
serviceId = requireParam("serviceId"),
configPath = requireParam("configPath"),
onChange = requireParam("onChange"),
} = requireParam("options"),
) => {
return await Deno.core.opAsync(
"get_service_config",
serviceId,
configPath,
registerCallback(onChange),
);
};
const setPermissions = async (
{
volumeId = requireParam("volumeId"),
path = requireParam("path"),
readonly = requireParam("readonly"),
} = requireParam("options"),
) => {
return await Deno.core.opAsync("set_permissions", volumeId, path, readonly);
};
const currentFunction = Deno.core.opSync("current_function");
const input = Deno.core.opSync("get_input");
const variable_args = Deno.core.opSync("get_variable_args");
const setState = (x) => Deno.core.opSync("set_value", x);
const setState = (x) => Deno.core.opAsync("set_value", x);
const effects = {
bindLocal,
bindTor,
chmod,
chown,
writeFile,
readFile,
writeJsonFile,
readJsonFile,
error,
warn,
createDir,
debug,
trace,
diskUsage,
error,
fetch,
getServiceConfig,
getServiceConfig,
info,
isSandboxed,
fetch,
removeFile,
createDir,
removeDir,
metadata,
readDir,
readFile,
readJsonFile,
removeDir,
removeFile,
rename,
runCommand,
sleep,
runDaemon,
signalGroup,
runRsync,
readDir,
diskUsage,
getServiceConfig,
setPermissions,
signalGroup,
sleep,
trace,
warn,
writeFile,
writeJsonFile,
};
const defaults = {
"handleSignal": (effects, { gid, signal }) => {
return effects.signalGroup({ gid, signal })
handleSignal: (effects, { gid, signal }) => {
return effects.signalGroup({ gid, signal });
},
};
function safeToString(fn, orValue = "") {
try {
return fn();
} catch (e) {
return orValue;
}
}
const runFunction = jsonPointerValue(mainModule, currentFunction) || jsonPointerValue(defaults, currentFunction);
const runFunction = jsonPointerValue(mainModule, currentFunction) ||
jsonPointerValue(defaults, currentFunction);
(async () => {
if (typeof runFunction !== "function") {
error(`Expecting ${currentFunction} to be a function`);
throw new Error(`Expecting ${currentFunction} to be a function`);
}
const answer = await runFunction(effects, input, ...variable_args);
setState(answer);
const answer = await (async () => {
if (typeof runFunction !== "function") {
error(`Expecting ${currentFunction} to be a function`);
throw new Error(`Expecting ${currentFunction} to be a function`);
}
})()
.then(() => runFunction(effects, input, ...variable_args))
.catch((e) => {
if ("error" in e) return e;
if ("error-code" in e) return e;
return {
error: safeToString(
() => e.toString(),
"Error Not able to be stringified",
),
};
});
await setState(answer);
})();

View File

@@ -109,8 +109,15 @@ enum ResultType {
ErrorCode(i32, String),
Result(serde_json::Value),
}
#[derive(Clone, Default)]
struct AnswerState(std::sync::Arc<deno_core::parking_lot::Mutex<Value>>);
#[derive(Clone)]
struct AnswerState(mpsc::Sender<Value>);
impl AnswerState {
fn new() -> (Self, mpsc::Receiver<Value>) {
let (send, recv) = mpsc::channel(1);
(Self(send), recv)
}
}
#[derive(Clone, Debug)]
struct ModsLoader {
@@ -282,6 +289,9 @@ impl JsExecutionEnvironment {
vec![
fns::chown::decl(),
fns::chmod::decl(),
fns::bind_local::decl(),
fns::bind_onion::decl(),
fns::chown::decl(),
fns::fetch::decl(),
fns::read_file::decl(),
fns::metadata::decl(),
@@ -306,6 +316,7 @@ impl JsExecutionEnvironment {
fns::wait_command::decl(),
fns::sleep::decl(),
fns::send_signal::decl(),
fns::set_permissions::decl(),
fns::signal_group::decl(),
fns::rsync::decl(),
fns::rsync_wait::decl(),
@@ -321,7 +332,7 @@ impl JsExecutionEnvironment {
variable_args: Vec<serde_json::Value>,
) -> Result<Value, (JsError, String)> {
let base_directory = self.base_directory.clone();
let answer_state = AnswerState::default();
let (answer_state, mut receive_answer) = AnswerState::new();
let ext_answer_state = answer_state.clone();
let (callback_sender, callback_receiver) = mpsc::unbounded_channel();
let js_ctx = JsContext {
@@ -376,12 +387,18 @@ impl JsExecutionEnvironment {
Ok::<_, AnyError>(())
};
future.await.map_err(|e| {
tracing::debug!("{:?}", e);
(JsError::Javascript, format!("{}", e))
})?;
let answer = tokio::select! {
Some(x) = receive_answer.recv() => x,
_ = future => {
if let Some(x) = receive_answer.recv().await {
x
}
else {
serde_json::json!({"error": "JS Engine Shutdown"})
}
},
let answer = answer_state.0.lock().clone();
};
Ok(answer)
}
}
@@ -399,10 +416,10 @@ impl<'a> Future for RuntimeEventLoop<'a> {
) -> std::task::Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(Some((uuid, value))) = this.callback.poll_recv(cx) {
match this
.runtime
.execute_script("callback", &format!("runCallback({uuid}, {value})"))
{
match this.runtime.execute_script(
"callback",
&format!("globalThis.runCallback(\"{uuid}\", {value})"),
) {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
}
@@ -440,7 +457,10 @@ mod fns {
OutputParams, OutputStrategy, ProcessGroupId, ProcessId, RunCommand, RunCommandParams,
SendSignal, SendSignalParams, SignalGroup, SignalGroupParams,
};
use helpers::{to_tmp_path, AtomicFile, Rsync, RsyncOptions, RuntimeDropped};
use helpers::{
to_tmp_path, AddressSchemaLocal, AddressSchemaOnion, AtomicFile, Rsync, RsyncOptions,
RuntimeDropped,
};
use itertools::Itertools;
use models::{PackageId, VolumeId};
use serde::{Deserialize, Serialize};
@@ -751,7 +771,7 @@ mod fns {
volume_path.to_string_lossy(),
);
}
if let Err(_) = tokio::fs::metadata(&src).await {
if tokio::fs::metadata(&src).await.is_err() {
bail!("Source at {} does not exists", src.to_string_lossy());
}
@@ -813,6 +833,39 @@ mod fns {
Ok(progress)
}
#[op]
async fn set_permissions(
state: Rc<RefCell<OpState>>,
volume_id: VolumeId,
path_in: PathBuf,
readonly: bool,
) -> Result<(), AnyError> {
let (volumes, volume_path) = {
let state = state.borrow();
let ctx: &JsContext = state.borrow();
let volume_path = ctx
.volumes
.path_for(&ctx.datadir, &ctx.package_id, &ctx.version, &volume_id)
.ok_or_else(|| anyhow!("There is no {} in volumes", volume_id))?;
(ctx.volumes.clone(), volume_path)
};
if volumes.readonly(&volume_id) {
bail!("Volume {} is readonly", volume_id);
}
let new_file = volume_path.join(path_in);
// With the volume check
if !is_subset(&volume_path, &new_file).await? {
bail!(
"Path '{}' has broken away from parent '{}'",
new_file.to_string_lossy(),
volume_path.to_string_lossy(),
);
}
let mut perms = tokio::fs::metadata(&new_file).await?.permissions();
perms.set_readonly(readonly);
tokio::fs::set_permissions(new_file, perms).await?;
Ok(())
}
#[op]
async fn remove_file(
state: Rc<RefCell<OpState>>,
volume_id: VolumeId,
@@ -1039,8 +1092,10 @@ mod fns {
#[op]
async fn log_trace(state: Rc<RefCell<OpState>>, input: String) -> Result<(), AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>().clone();
let ctx = {
let state = state.borrow();
state.borrow::<JsContext>().clone()
};
if let Some(rpc_client) = ctx.container_rpc_client {
return rpc_client
.request(
@@ -1063,8 +1118,10 @@ mod fns {
}
#[op]
async fn log_warn(state: Rc<RefCell<OpState>>, input: String) -> Result<(), AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>().clone();
let ctx = {
let state = state.borrow();
state.borrow::<JsContext>().clone()
};
if let Some(rpc_client) = ctx.container_rpc_client {
return rpc_client
.request(
@@ -1087,8 +1144,10 @@ mod fns {
}
#[op]
async fn log_error(state: Rc<RefCell<OpState>>, input: String) -> Result<(), AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>().clone();
let ctx = {
let state = state.borrow();
state.borrow::<JsContext>().clone()
};
if let Some(rpc_client) = ctx.container_rpc_client {
return rpc_client
.request(
@@ -1111,8 +1170,10 @@ mod fns {
}
#[op]
async fn log_debug(state: Rc<RefCell<OpState>>, input: String) -> Result<(), AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>().clone();
let ctx = {
let state = state.borrow();
state.borrow::<JsContext>().clone()
};
if let Some(rpc_client) = ctx.container_rpc_client {
return rpc_client
.request(
@@ -1135,8 +1196,10 @@ mod fns {
}
#[op]
async fn log_info(state: Rc<RefCell<OpState>>, input: String) -> Result<(), AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>().clone();
let ctx = {
let state = state.borrow();
state.borrow::<JsContext>().clone()
};
if let Some(rpc_client) = ctx.container_rpc_client {
return rpc_client
.request(
@@ -1169,9 +1232,16 @@ mod fns {
Ok(ctx.variable_args.clone())
}
#[op]
fn set_value(state: &mut OpState, value: Value) -> Result<(), AnyError> {
let mut answer = state.borrow::<AnswerState>().0.lock();
*answer = value;
async fn set_value(state: Rc<RefCell<OpState>>, value: Value) -> Result<(), AnyError> {
let sender = {
let state = state.borrow();
let answer_state = state.borrow::<AnswerState>().0.clone();
answer_state
};
sender
.send(value)
.await
.map_err(|_e| anyhow!("Could not set a value"))?;
Ok(())
}
#[op]
@@ -1424,28 +1494,54 @@ mod fns {
service_id: PackageId,
path: String,
callback: String,
) -> Result<ResultType, AnyError> {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
let sender = ctx.callback_sender.clone();
Ok(
match ctx
.os
.get_service_config(
service_id,
&path,
Box::new(move |value| {
sender
.send((callback.clone(), value))
.map_err(|_| RuntimeDropped)
}),
)
.await
{
Ok(a) => ResultType::Result(a),
Err(e) => ResultType::ErrorCode(e.kind as i32, e.source.to_string()),
},
) -> Result<Value, AnyError> {
let (sender, os) = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
(ctx.callback_sender.clone(), ctx.os.clone())
};
os.get_service_config(
service_id,
&path,
Box::new(move |value| {
sender
.send((callback.clone(), value))
.map_err(|_| RuntimeDropped)
}),
)
.await
.map_err(|e| anyhow!("Couldn't get service config: {e:?}"))
}
#[op]
async fn bind_onion(
state: Rc<RefCell<OpState>>,
internal_port: u16,
address_schema: AddressSchemaOnion,
) -> Result<helpers::Address, AnyError> {
let os = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
ctx.os.clone()
};
os.bind_onion(internal_port, address_schema)
.await
.map_err(|e| anyhow!("{e:?}"))
}
#[op]
async fn bind_local(
state: Rc<RefCell<OpState>>,
internal_port: u16,
address_schema: AddressSchemaLocal,
) -> Result<helpers::Address, AnyError> {
let os = {
let state = state.borrow();
let ctx = state.borrow::<JsContext>();
ctx.os.clone()
};
os.bind_local(internal_port, address_schema)
.await
.map_err(|e| anyhow!("{e:?}"))
}
/// We need to make sure that during the file accessing, we don't reach beyond our scope of control