make js cancellable (#1901)

This commit is contained in:
Aiden McClelland
2022-11-01 14:01:42 -06:00
parent defc98ab0e
commit 21cf4cd2ce
3 changed files with 28 additions and 84 deletions

View File

@@ -749,8 +749,8 @@ export const action = {
async slow(effects, _input) { async slow(effects, _input) {
while(true) { while(true) {
effects.error("A"); effects.error("A");
// await ackermann(3,10); await ackermann(3,10);
await effects.sleep(100); // await effects.sleep(100);
} }
}, },

View File

@@ -1,14 +1,13 @@
use std::future::Future; use std::future::Future;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use color_eyre::eyre::{eyre, Context, Error}; use color_eyre::eyre::{eyre, Context, Error};
use futures::future::{pending, BoxFuture}; use futures::future::BoxFuture;
use futures::FutureExt; use futures::FutureExt;
use tokio::fs::File; use tokio::fs::File;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task::{JoinError, JoinHandle}; use tokio::task::{JoinError, JoinHandle, LocalSet};
mod script_dir; mod script_dir;
pub use script_dir::*; pub use script_dir::*;
@@ -210,79 +209,25 @@ impl<T: 'static + Send> TimedResource<T> {
} }
} }
type SingThreadTask<T> = futures::future::Select< pub async fn spawn_local<
futures::future::Then< T: 'static + Send,
oneshot::Receiver<T>, F: FnOnce() -> Fut + Send + 'static,
futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>, Fut: Future<Output = T> + 'static,
fn( >(
Result<T, oneshot::error::RecvError>, fut: F,
) ) -> NonDetachingJoinHandle<T> {
-> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>, let (send, recv) = tokio::sync::oneshot::channel();
>, std::thread::spawn(move || {
futures::future::Then< tokio::runtime::Builder::new_current_thread()
JoinHandle<()>, .enable_all()
futures::future::Pending<T>, .build()
fn(Result<(), JoinError>) -> futures::future::Pending<T>, .unwrap()
>, .block_on(async move {
>; let set = LocalSet::new();
send.send(set.spawn_local(fut()).into())
#[pin_project::pin_project(PinnedDrop)] .unwrap_or_else(|_| unreachable!());
pub struct SingleThreadJoinHandle<T> { set.await
abort: Option<oneshot::Sender<()>>, })
#[pin] });
task: SingThreadTask<T>, recv.await.unwrap()
}
impl<T: Send + 'static> SingleThreadJoinHandle<T> {
pub fn new<Fut: Future<Output = T>>(fut: impl FnOnce() -> Fut + Send + 'static) -> Self {
let (abort, abort_recv) = oneshot::channel();
let (return_val_send, return_val) = oneshot::channel();
fn unwrap_recv_or_pending<T>(
res: Result<T, oneshot::error::RecvError>,
) -> futures::future::Either<futures::future::Ready<T>, futures::future::Pending<T>>
{
match res {
Ok(a) => futures::future::Either::Left(futures::future::ready(a)),
_ => futures::future::Either::Right(pending()),
}
}
fn make_pending<T>(_: Result<(), JoinError>) -> futures::future::Pending<T> {
pending()
}
Self {
abort: Some(abort),
task: futures::future::select(
return_val.then(unwrap_recv_or_pending),
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
tokio::select! {
_ = abort_recv.fuse() => (),
res = fut().fuse() => {let _error = return_val_send.send(res);},
}
})
})
.then(make_pending),
),
}
}
}
impl<T: Send> Future for SingleThreadJoinHandle<T> {
type Output = T;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.task.poll(cx).map(|t| t.factor_first().0)
}
}
#[pin_project::pinned_drop]
impl<T> PinnedDrop for SingleThreadJoinHandle<T> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(abort) = this.abort.take() {
let _error = abort.send(());
}
}
} }

View File

@@ -11,7 +11,7 @@ use deno_core::{
resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture, resolve_import, Extension, JsRuntime, ModuleLoader, ModuleSource, ModuleSourceFuture,
ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot, ModuleSpecifier, ModuleType, OpDecl, RuntimeOptions, Snapshot,
}; };
use helpers::{script_dir, SingleThreadJoinHandle}; use helpers::{script_dir, spawn_local};
use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId}; use models::{ExecCommand, PackageId, ProcedureName, TermCommand, Version, VolumeId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@@ -255,9 +255,8 @@ impl JsExecutionEnvironment {
)); ));
} }
}; };
let safer_handle = let safer_handle = spawn_local(|| self.execute(procedure_name, input, variable_args)).await;
SingleThreadJoinHandle::new(move || self.execute(procedure_name, input, variable_args)); let output = safer_handle.await.unwrap()?;
let output = safer_handle.await?;
match serde_json::from_value(output.clone()) { match serde_json::from_value(output.clone()) {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(err) => { Err(err) => {