From b4482775a900be392962ab1a4c2086d7152839d5 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 15:32:51 +1000 Subject: [PATCH] Ok all working again --- crates/neon/Cargo.toml | 2 +- crates/neon/src/async_local/mod.rs | 25 +++++--- crates/neon/src/async_local/runtime.rs | 5 +- crates/neon/src/async_local/waker.rs | 19 +++--- crates/neon/src/context/mod.rs | 83 +++++++++++++------------- crates/neon/src/handle/mod.rs | 7 ++- crates/neon/src/handle/root_value.rs | 42 +++++++------ crates/neon/src/prelude.rs | 4 +- 8 files changed, 104 insertions(+), 83 deletions(-) diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index 493498714..2d95bc26d 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -44,7 +44,7 @@ features = ["sync"] optional = true [features] -default = ["napi-8", "futures"] +default = ["napi-8"] # Enable extracting values by serializing to JSON serde = ["dep:serde", "dep:serde_json"] diff --git a/crates/neon/src/async_local/mod.rs b/crates/neon/src/async_local/mod.rs index 17f7a0ed2..81f4ef21b 100644 --- a/crates/neon/src/async_local/mod.rs +++ b/crates/neon/src/async_local/mod.rs @@ -17,20 +17,31 @@ use crate::sys; pub fn spawn_async_local<'a>( cx: &mut impl Context<'a>, future: impl Future + 'static, -) -> Result<(), ()> { +) { // Add a future to the future pool to be executed // whenever the Nodejs event loop is free to do so - LocalRuntime::queue_future(future).unwrap(); + LocalRuntime::queue_future(future); // If there are tasks in flight then the executor // is already running and should be reused if LocalRuntime::futures_count() > 1 { - return Ok(()); + return; } - // The futures executor runs on another thread and will - // use a threadsafe function to call schedule work - // on the JavaScript thread + // The futures executor runs on the main thread thread but + // the waker runs on another thread. + // + // The main thread executor will run the contained futures + // and as soon as they stall (e.g. waiting for a channel, timer, etc), + // the executor will immediately yield back to the JavaScript event loop. + // + // This "parks" the executer, which normally means the thread + // is block - however we cannot do that here so instead, there + // is a sacrificial "waker" thread who's only job is to sleep/wake and + // signal to Nodejs that futures need to be run. + // + // The waker thread notifies the main thread of pending work by + // running the futures executor within a threadsafe function let env_raw = cx.env().to_raw(); LocalWaker::send(WakerEvent::Init(unsafe { @@ -44,6 +55,4 @@ pub fn spawn_async_local<'a>( } }) })); - - Ok(()) } diff --git a/crates/neon/src/async_local/runtime.rs b/crates/neon/src/async_local/runtime.rs index f1a49d22d..764ea77ac 100644 --- a/crates/neon/src/async_local/runtime.rs +++ b/crates/neon/src/async_local/runtime.rs @@ -5,7 +5,6 @@ use super::executor::LocalPool; use super::executor::LocalSpawner; use super::executor::ThreadNotifyRef; use futures::task::LocalSpawnExt; -use futures::task::SpawnError; use once_cell::unsync::Lazy; thread_local! { @@ -21,12 +20,12 @@ impl LocalRuntime { Self::count() } - pub fn queue_future(future: impl Future + 'static) -> Result<(), SpawnError> { + pub fn queue_future(future: impl Future + 'static) { Self::increment(); SPAWNER.with(move |ls| ls.spawn_local(async move { future.await; Self::decrement(); - })) + })).expect("Unable to spawn future on local pool"); } pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool { diff --git a/crates/neon/src/async_local/waker.rs b/crates/neon/src/async_local/waker.rs index 70198294f..b8ea50f81 100644 --- a/crates/neon/src/async_local/waker.rs +++ b/crates/neon/src/async_local/waker.rs @@ -22,18 +22,20 @@ pub enum WakerEvent { } /// The futures waker that coordinates with the futures executor to notify -/// the main thread to pause and resume execution of futures. +/// the main thread to resume execution of futures. /// /// The waker is implemented as a dedicated system thread which is parked -/// by the local futures executor while waiting for futures to resume work. +/// by the local futures executor. Futures (like channel, timers) will +/// call the wake() method Futures Waker trait. /// -/// Once woken up, the waker resumes execution of futures on the JavaScript -/// thread by triggering a napi threadsafe function to poll the futures in -/// the local pool until no more progress can be made before yielding back -/// to the Nodejs event loop. +/// This gives it some level of portability - for instance any utilities +/// from the "async_std" crate will work however most things from Tokio +/// won't work. /// -/// This allows for the execution of Rust futures to integrate with the -/// Nodejs event loop without blocking either +/// Once woken up, the waker resumes execution of futures on the JavaScript +/// thread by triggering a napi threadsafe function which executes a callback +/// that runs on the main JavaScript thread. This callback is used to poll +/// the futures in the local pool. pub struct LocalWaker; impl LocalWaker { @@ -46,7 +48,6 @@ impl LocalWaker { fn start_waker_thread() -> Sender { let (tx, rx) = channel(); - // Dedicated waker thread to use for waiting on pending futures thread::spawn(move || { let thread_notify = ThreadNotify::new(); let mut handle = None::; diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index fcc3d9fea..1941941f9 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,10 +145,10 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; -// #[cfg(feature = "async_local")] -// use crate::async_local::{root::RootGlobal, spawn_async_local}; #[cfg(feature = "async_local")] use futures::Future; +#[cfg(feature = "async_local")] +use crate::handle::StaticHandle; use crate::{ event::TaskBuilder, @@ -290,22 +290,22 @@ pub trait Context<'a>: ContextInternal<'a> { result } + /// Execute a future on the local JavaScript thread. This does not block JavaScript execution. + /// + /// Note: Avoid doing heavy computation on the main thread. The intended use case for this is + /// waiting on channel receivers for data coming from other threads, waiting on timers and + /// handling async behaviors from JavaScript. #[cfg(feature = "async_local")] - fn execute_async_local(&mut self, f: F) + fn spawn_local(&mut self, f: F) where Fut: Future, F: FnOnce(AsyncContext) -> Fut + 'static, { - use futures::Future; - let env = self.env(); - crate::async_local::spawn_async_local(self, async move { - // let scope = unsafe { HandleScope::new(env.to_raw()) }; let future = f(AsyncContext { env }); future.await; - // drop(scope); - }).unwrap(); + }); } /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. @@ -623,29 +623,31 @@ impl<'a> ModuleContext<'a> { F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, V: Value, { - // let wrapper = JsFunction::new(self, move |mut cx| { - // let mut args = vec![]; - - // while let Some(arg) = cx.argument_opt(args.len()) { - // let arg = arg.as_value(&mut cx); - // let arg = RootGlobal::new(&mut cx, arg); - // args.push(arg); - // } - - // let (deferred, promise) = cx.promise(); - // cx.execute_async_local(move |mut cx| async move { - // let acx = AsyncFunctionContext { - // env: cx.env(), - // arguments: args, - // }; - // deferred.resolve(&mut cx, f(acx).await.unwrap()); - // () - // }); - - // Ok(promise) - // })?; - - // self.exports.clone().set(self, key, wrapper)?; + use crate::handle::StaticHandle; + + let wrapper = JsFunction::new(self, move |mut cx| { + let mut args = vec![]; + + while let Some(arg) = cx.argument_opt(args.len()) { + let arg = arg.as_value(&mut cx); + let arg = StaticHandle::new(&mut cx, arg)?; + args.push(arg); + } + + let (deferred, promise) = cx.promise(); + cx.spawn_local(move |mut cx| async move { + let acx = AsyncFunctionContext { + env: cx.env(), + arguments: args, + }; + deferred.resolve(&mut cx, f(acx).await.unwrap()); + () + }); + + Ok(promise) + })?; + + self.exports.clone().set(self, key, wrapper)?; Ok(()) } @@ -851,16 +853,17 @@ impl<'a> Context<'a> for FunctionContext<'a> {} #[cfg(feature = "async_local")] pub struct AsyncFunctionContext { env: Env, - // arguments: Vec, + arguments: Vec>, } #[cfg(feature = "async_local")] impl<'a> AsyncFunctionContext { pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { - // let arg = self.arguments.get(i).unwrap().clone(); - // let handle = arg.into_inner(self); - // Ok(handle) - todo!() + let arg = self.arguments.get(i).unwrap().clone(); + let arg = arg.from_static(self)?; + let value = unsafe { V::from_local(self.env(), arg.to_local()) }; + let handle = Handle::new_internal(value); + Ok(handle) } } @@ -877,9 +880,9 @@ impl<'a> Context<'a> for AsyncFunctionContext {} #[cfg(feature = "async_local")] impl Drop for AsyncFunctionContext { fn drop(&mut self) { - // while let Some(arg) = self.arguments.pop() { - // arg.remove(self); - // } + while let Some(arg) = self.arguments.pop() { + arg.drop(self).unwrap(); + } } } diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index f760d7129..7974257e2 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -96,8 +96,11 @@ impl<'a, V: Value + 'a> Handle<'a, V> { } } - pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult> { - RootGlobal::new(cx, self) + /// Detaches the value from the Nodejs garbage collector + /// and manages the variable lifetime via reference counting. + /// Useful when interacting with a value within async closures + pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult> { + StaticHandle::new(cx, self) } } diff --git a/crates/neon/src/handle/root_value.rs b/crates/neon/src/handle/root_value.rs index 83aabb533..ad838c4d2 100644 --- a/crates/neon/src/handle/root_value.rs +++ b/crates/neon/src/handle/root_value.rs @@ -12,24 +12,28 @@ use crate::result::JsResult; use crate::result::NeonResult; use crate::types::JsFunction; use crate::types::JsObject; +use crate::types::JsSymbol; + +// This creates a rooted object and stores javascript +// values on it as a way to grant any JavaScript value +// a static lifetime thread_local! { - // Symbol("__neon_cache") static NEON_CACHE: OnceCell> = OnceCell::default(); } /// Reference counted JavaScript value with a static lifetime for use in async closures -pub struct RootGlobal { +pub struct StaticHandle { pub(crate) count: Rc>, - pub(crate) inner: Rc, + pub(crate) inner: Rc>, _p: PhantomData, } -impl RootGlobal { +impl StaticHandle { pub(crate) fn new<'a>( cx: &mut impl Context<'a>, value: Handle<'a, T>, - ) -> NeonResult> { + ) -> NeonResult> { Ok(Self { count: Rc::new(RefCell::new(1)), inner: Rc::new(set_ref(cx, value)?), @@ -37,7 +41,7 @@ impl RootGlobal { }) } - pub fn clone<'a>(&self) -> RootGlobal { + pub fn clone(&self) -> StaticHandle { let mut count = self.count.borrow_mut(); *count += 1; drop(count); @@ -49,8 +53,8 @@ impl RootGlobal { } } - pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { - get_ref(cx, &*self.inner) + pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { + get_ref(cx, &self.inner) } pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> { @@ -58,7 +62,7 @@ impl RootGlobal { *count -= 1; if *count == 0 { - delete_ref(cx, &*self.inner)? + delete_ref(cx, &self.inner)? } Ok(()) @@ -81,41 +85,43 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { Ok(neon_cache.into_inner(cx)) } -fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult { +fn set_ref<'a, V: Value>( + cx: &mut impl Context<'a>, + value: Handle<'a, V>, +) -> NeonResult> { let neon_cache = get_cache(cx)?; - // Is this safe? - let key = format!("{:?}", value.to_local()); + let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx); get_cache(cx)? .get::(cx, "set")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(symbol.clone(cx).into_inner(cx)) .arg(value) .exec(cx)?; - Ok(key) + Ok(symbol) } -fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> { +fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root) -> JsResult<'a, V> { let neon_cache = get_cache(cx)?; get_cache(cx)? .get::(cx, "get")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(key.clone(cx).into_inner(cx)) .apply(cx) } -fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> { +fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root) -> NeonResult<()> { let neon_cache = get_cache(cx)?; get_cache(cx)? .get::(cx, "delete")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(key.clone(cx).into_inner(cx)) .exec(cx)?; Ok(()) diff --git a/crates/neon/src/prelude.rs b/crates/neon/src/prelude.rs index 01b87f55e..837dc5ca1 100644 --- a/crates/neon/src/prelude.rs +++ b/crates/neon/src/prelude.rs @@ -3,8 +3,8 @@ #[doc(no_inline)] pub use crate::{ context::{ - CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext, - TaskContext, + AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext, + FunctionContext, ModuleContext, TaskContext, }, handle::{Handle, Root}, object::Object,