From 21cf4cd2ce77f861d7a2e4b358279d9c1031f7fb Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Tue, 1 Nov 2022 14:01:42 -0600 Subject: [PATCH] make js cancellable (#1901) --- .../scripts/test-package/0.3.0.3/embassy.js | 4 +- libs/helpers/src/lib.rs | 101 ++++-------------- libs/js_engine/src/lib.rs | 7 +- 3 files changed, 28 insertions(+), 84 deletions(-) diff --git a/backend/test/js_action_execute/package-data/scripts/test-package/0.3.0.3/embassy.js b/backend/test/js_action_execute/package-data/scripts/test-package/0.3.0.3/embassy.js index 4e0c2cbff..729d47d19 100644 --- a/backend/test/js_action_execute/package-data/scripts/test-package/0.3.0.3/embassy.js +++ b/backend/test/js_action_execute/package-data/scripts/test-package/0.3.0.3/embassy.js @@ -749,8 +749,8 @@ export const action = { async slow(effects, _input) { while(true) { effects.error("A"); - // await ackermann(3,10); - await effects.sleep(100); + await ackermann(3,10); + // await effects.sleep(100); } }, diff --git a/libs/helpers/src/lib.rs b/libs/helpers/src/lib.rs index d70017592..5da97063c 100644 --- a/libs/helpers/src/lib.rs +++ b/libs/helpers/src/lib.rs @@ -1,14 +1,13 @@ use std::future::Future; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::time::Duration; use color_eyre::eyre::{eyre, Context, Error}; -use futures::future::{pending, BoxFuture}; +use futures::future::BoxFuture; use futures::FutureExt; use tokio::fs::File; use tokio::sync::oneshot; -use tokio::task::{JoinError, JoinHandle}; +use tokio::task::{JoinError, JoinHandle, LocalSet}; mod script_dir; pub use script_dir::*; @@ -210,79 +209,25 @@ impl TimedResource { } } -type SingThreadTask = futures::future::Select< - futures::future::Then< - oneshot::Receiver, - futures::future::Either, futures::future::Pending>, - fn( - Result, - ) - -> futures::future::Either, futures::future::Pending>, - >, - futures::future::Then< - JoinHandle<()>, - futures::future::Pending, - fn(Result<(), JoinError>) -> futures::future::Pending, - >, ->; - -#[pin_project::pin_project(PinnedDrop)] -pub struct SingleThreadJoinHandle { - abort: Option>, - #[pin] - task: SingThreadTask, -} -impl SingleThreadJoinHandle { - pub fn new>(fut: impl FnOnce() -> Fut + Send + 'static) -> Self { - let (abort, abort_recv) = oneshot::channel(); - let (return_val_send, return_val) = oneshot::channel(); - fn unwrap_recv_or_pending( - res: Result, - ) -> futures::future::Either, futures::future::Pending> - { - match res { - Ok(a) => futures::future::Either::Left(futures::future::ready(a)), - _ => futures::future::Either::Right(pending()), - } - } - fn make_pending(_: Result<(), JoinError>) -> futures::future::Pending { - pending() - } - Self { - abort: Some(abort), - task: futures::future::select( - return_val.then(unwrap_recv_or_pending), - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - tokio::select! { - _ = abort_recv.fuse() => (), - res = fut().fuse() => {let _error = return_val_send.send(res);}, - } - }) - }) - .then(make_pending), - ), - } - } -} - -impl Future for SingleThreadJoinHandle { - type Output = T; - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let this = self.project(); - this.task.poll(cx).map(|t| t.factor_first().0) - } -} - -#[pin_project::pinned_drop] -impl PinnedDrop for SingleThreadJoinHandle { - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if let Some(abort) = this.abort.take() { - let _error = abort.send(()); - } - } +pub async fn spawn_local< + T: 'static + Send, + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + 'static, +>( + fut: F, +) -> NonDetachingJoinHandle { + let (send, recv) = tokio::sync::oneshot::channel(); + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let set = LocalSet::new(); + send.send(set.spawn_local(fut()).into()) + .unwrap_or_else(|_| unreachable!()); + set.await + }) + }); + recv.await.unwrap() } diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index a1c104094..760a752d8 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -11,7 +11,7 @@ use deno_core::{ resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture, ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot, }; -use helpers::{script_dir, SingleThreadJoinHandle}; +use helpers::{script_dir, spawn_local}; use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -255,9 +255,8 @@ impl JsExecutionEnvironment { )); } }; - let safer_handle = - SingleThreadJoinHandle::new(move || self.execute(procedure_name, input, variable_args)); - let output = safer_handle.await?; + let safer_handle = spawn_local(|| self.execute(procedure_name, input, variable_args)).await; + let output = safer_handle.await.unwrap()?; match serde_json::from_value(output.clone()) { Ok(x) => Ok(x), Err(err) => {