diff --git a/.gitignore b/.gitignore index 740828832..d012485bd 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ pkgs/create-neon/create-neon-manual-test-project test/cli/lib npm-debug.log rls*.log +.vscode diff --git a/Cargo.lock b/Cargo.lock index 065d7b326..b26cfc035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,7 +73,7 @@ version = "0.65.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", "cexpr", "clang-sys", "lazy_static", @@ -92,9 +92,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.3.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "bitflags" @@ -178,6 +178,95 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.57", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.11" @@ -377,6 +466,7 @@ dependencies = [ "aquamarine", "doc-comment", "easy-cast", + "futures", "getrandom", "libloading 0.8.1", "linkify", @@ -506,6 +596,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -712,6 +808,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.2" diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index d15b86396..2d95bc26d 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -35,6 +35,7 @@ doc-comment = { version = "0.3.3", optional = true } send_wrapper = "0.6.0" serde = { version = "1.0.197", optional = true } serde_json = { version = "1.0.114", optional = true } +futures = { version = "0.3", optional = true } [dependencies.tokio] version = "1.34.0" @@ -57,6 +58,8 @@ external-buffers = [] # https://github.com/neon-bindings/rfcs/pull/46 futures = ["tokio"] +async_local = ["dep:futures"] + # Enable low-level system APIs. The `sys` API allows augmenting the Neon API # from external crates. sys = [] diff --git a/crates/neon/src/async_local/executor/enter.rs b/crates/neon/src/async_local/executor/enter.rs new file mode 100644 index 000000000..24063e9f8 --- /dev/null +++ b/crates/neon/src/async_local/executor/enter.rs @@ -0,0 +1,53 @@ +use std::cell::Cell; +use std::fmt; + +std::thread_local!(static ENTERED: Cell = Cell::new(false)); + +pub struct Enter { + _priv: (), +} + +pub struct EnterError { + _priv: (), +} + +impl fmt::Debug for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EnterError").finish() + } +} + +impl fmt::Display for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "an execution scope has already been entered") + } +} + +impl std::error::Error for EnterError {} + +pub fn enter() -> Result { + ENTERED.with(|c| { + if c.get() { + Err(EnterError { _priv: () }) + } else { + c.set(true); + + Ok(Enter { _priv: () }) + } + }) +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + c.set(false); + }); + } +} diff --git a/crates/neon/src/async_local/executor/local_pool.rs b/crates/neon/src/async_local/executor/local_pool.rs new file mode 100644 index 000000000..6599c2ebc --- /dev/null +++ b/crates/neon/src/async_local/executor/local_pool.rs @@ -0,0 +1,230 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::rc::Weak; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; +use std::thread::Thread; +use std::vec::Vec; + +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; +use futures::task::waker_ref; +use futures::task::ArcWake; +use futures::task::Context; +use futures::task::FutureObj; +use futures::task::LocalFutureObj; +use futures::task::LocalSpawn; +use futures::task::Poll; +use futures::task::Spawn; +use futures::task::SpawnError; + +use super::enter::enter; + +#[derive(Debug)] +pub struct LocalPool { + pool: FuturesUnordered>, + incoming: Rc, +} + +#[derive(Clone, Debug)] +pub struct LocalSpawner { + incoming: Weak, +} + +type Incoming = RefCell>>; + +#[derive(Debug)] +pub(crate) struct ThreadNotify { + pub(crate) thread: Thread, + pub(crate) unparked: AtomicBool, +} + +pub type ThreadNotifyRef = Arc; + +impl ThreadNotify { + pub fn new() -> Arc { + Arc::new(ThreadNotify { + thread: thread::current(), + unparked: AtomicBool::new(false), + }) + } +} + +impl ArcWake for ThreadNotify { + fn wake_by_ref(arc_self: &Arc) { + let unparked = arc_self.unparked.swap(true, Ordering::Release); + if !unparked { + arc_self.thread.unpark(); + } + } +} + +pub fn wait_for_wake(thread_notify: &ThreadNotify) { + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + std::thread::park(); + } +} + +fn woken(thread_notify: &ThreadNotify) -> bool { + thread_notify.unparked.load(Ordering::Acquire) +} + +fn run_executor) -> Poll>( + thread_notify: Arc, + mut f: F, +) -> T { + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); + + let waker = waker_ref(&thread_notify); + let mut cx = Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f(&mut cx) { + return t; + } + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + // No wakeup occurred. It may occur now, right before parking, + // but in that case the token made available by `unpark()` + // is guaranteed to still be available and `park()` is a no-op. + thread::park(); + } + } +} + +impl LocalPool { + pub fn new() -> Self { + Self { + pool: FuturesUnordered::new(), + incoming: Default::default(), + } + } + + pub fn spawner(&self) -> LocalSpawner { + LocalSpawner { + incoming: Rc::downgrade(&self.incoming), + } + } + + // Note: Can be used to interleave futures with the JS event loop + /// Runs all tasks and returns after completing one future or until no more progress + /// can be made. Returns `true` if one future was completed, `false` otherwise. + #[allow(unused)] + pub fn try_run_one(&mut self, thread_notify: Arc) -> bool { + run_executor(thread_notify.clone(), |cx| { + loop { + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), + } + + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken(&thread_notify) { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); + } + } + }) + } + + /// Runs all tasks in the pool and returns if no more progress can be made on any task. + #[allow(unused)] + pub fn run_until_stalled(&mut self, thread_notify: Arc) { + run_executor(thread_notify.clone(), |cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken(&thread_notify) { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } + }); + } + + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { + loop { + self.drain_incoming(); + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. + if !self.incoming.borrow().is_empty() { + continue; + } + + match pool_ret { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } + + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) + } + } +} + +impl Default for LocalPool { + fn default() -> Self { + Self::new() + } +} + +impl Spawn for LocalSpawner { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future.into()); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} + +impl LocalSpawn for LocalSpawner { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status_local(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} diff --git a/crates/neon/src/async_local/executor/mod.rs b/crates/neon/src/async_local/executor/mod.rs new file mode 100644 index 000000000..3a933d0eb --- /dev/null +++ b/crates/neon/src/async_local/executor/mod.rs @@ -0,0 +1,4 @@ +mod enter; +mod local_pool; + +pub use self::local_pool::*; diff --git a/crates/neon/src/async_local/mod.rs b/crates/neon/src/async_local/mod.rs new file mode 100644 index 000000000..81f4ef21b --- /dev/null +++ b/crates/neon/src/async_local/mod.rs @@ -0,0 +1,58 @@ +mod executor; +mod runtime; +mod waker; + +use std::future::Future; + +use executor::ThreadNotifyRef; +use waker::LocalWaker; +use waker::WakerEvent; + +use self::runtime::LocalRuntime; +use crate::context::Context; +use crate::sys; + +/// Schedule a future to run asynchronously on the local JavaScript thread. +/// The future's execution will not block the local thread. +pub fn spawn_async_local<'a>( + cx: &mut impl Context<'a>, + future: impl Future + 'static, +) { + // Add a future to the future pool to be executed + // whenever the Nodejs event loop is free to do so + LocalRuntime::queue_future(future); + + // If there are tasks in flight then the executor + // is already running and should be reused + if LocalRuntime::futures_count() > 1 { + return; + } + + // The futures executor runs on the main thread thread but + // the waker runs on another thread. + // + // The main thread executor will run the contained futures + // and as soon as they stall (e.g. waiting for a channel, timer, etc), + // the executor will immediately yield back to the JavaScript event loop. + // + // This "parks" the executer, which normally means the thread + // is block - however we cannot do that here so instead, there + // is a sacrificial "waker" thread who's only job is to sleep/wake and + // signal to Nodejs that futures need to be run. + // + // The waker thread notifies the main thread of pending work by + // running the futures executor within a threadsafe function + let env_raw = cx.env().to_raw(); + + LocalWaker::send(WakerEvent::Init(unsafe { + sys::tsfn::ThreadsafeFunction::::new(env_raw, |_, thread_notify| { + let done = LocalRuntime::run_until_stalled(thread_notify); + + if done { + LocalWaker::send(WakerEvent::Done); + } else { + LocalWaker::send(WakerEvent::Next); + } + }) + })); +} diff --git a/crates/neon/src/async_local/runtime.rs b/crates/neon/src/async_local/runtime.rs new file mode 100644 index 000000000..764ea77ac --- /dev/null +++ b/crates/neon/src/async_local/runtime.rs @@ -0,0 +1,51 @@ +use std::cell::RefCell; +use std::future::Future; + +use super::executor::LocalPool; +use super::executor::LocalSpawner; +use super::executor::ThreadNotifyRef; +use futures::task::LocalSpawnExt; +use once_cell::unsync::Lazy; + +thread_local! { + static LOCAL_POOL: Lazy> = Lazy::default(); + static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner())); + static FUTURES_COUNT: Lazy> = Lazy::default(); +} + +pub struct LocalRuntime; + +impl LocalRuntime { + pub fn futures_count() -> usize { + Self::count() + } + + pub fn queue_future(future: impl Future + 'static) { + Self::increment(); + SPAWNER.with(move |ls| ls.spawn_local(async move { + future.await; + Self::decrement(); + })).expect("Unable to spawn future on local pool"); + } + + pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool { + LOCAL_POOL.with(move |lp| lp.borrow_mut().run_until_stalled(thread_notify)); + if Self::count() == 0 { + true + } else { + false + } + } + + fn count() -> usize { + FUTURES_COUNT.with(|c| *c.borrow_mut()) + } + + fn increment() { + FUTURES_COUNT.with(|c| *c.borrow_mut() += 1); + } + + fn decrement() { + FUTURES_COUNT.with(|c| *c.borrow_mut() -= 1); + } +} diff --git a/crates/neon/src/async_local/waker.rs b/crates/neon/src/async_local/waker.rs new file mode 100644 index 000000000..b8ea50f81 --- /dev/null +++ b/crates/neon/src/async_local/waker.rs @@ -0,0 +1,84 @@ +use std::sync::mpsc::channel; +use std::sync::mpsc::Sender; +use std::thread; + +use crate::sys; + +use super::executor::wait_for_wake; +use super::executor::ThreadNotify; +use super::executor::ThreadNotifyRef; +use once_cell::unsync::Lazy; + +thread_local! { + static WAKER_THREAD: Lazy> = Lazy::new(LocalWaker::start_waker_thread); +} + +pub type WakerInit = sys::tsfn::ThreadsafeFunction; + +pub enum WakerEvent { + Init(WakerInit), + Next, + Done, +} + +/// The futures waker that coordinates with the futures executor to notify +/// the main thread to resume execution of futures. +/// +/// The waker is implemented as a dedicated system thread which is parked +/// by the local futures executor. Futures (like channel, timers) will +/// call the wake() method Futures Waker trait. +/// +/// This gives it some level of portability - for instance any utilities +/// from the "async_std" crate will work however most things from Tokio +/// won't work. +/// +/// Once woken up, the waker resumes execution of futures on the JavaScript +/// thread by triggering a napi threadsafe function which executes a callback +/// that runs on the main JavaScript thread. This callback is used to poll +/// the futures in the local pool. +pub struct LocalWaker; + +impl LocalWaker { + pub fn send(event: WakerEvent) { + WAKER_THREAD + .with(|tx| tx.send(event)) + .expect("Unable to communicate with waker"); + } + + fn start_waker_thread() -> Sender { + let (tx, rx) = channel(); + + thread::spawn(move || { + let thread_notify = ThreadNotify::new(); + let mut handle = None::; + + while let Ok(event) = rx.recv() { + match event { + WakerEvent::Init(incoming) => { + if handle.replace(incoming).is_some() { + panic!("Handle already init"); + }; + let Some(ref handle) = handle else { + panic!("No handle"); + }; + handle.call(thread_notify.clone(), None).ok(); + } + WakerEvent::Next => { + wait_for_wake(&thread_notify); + let Some(ref handle) = handle else { + panic!("No handle"); + }; + handle.call(thread_notify.clone(), None).ok(); + } + WakerEvent::Done => { + if let Some(handle) = handle.take() { + drop(handle); + } + } + }; + } + }); + + tx + } +} diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 0d936ed3f..1941941f9 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,6 +145,11 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; +#[cfg(feature = "async_local")] +use futures::Future; +#[cfg(feature = "async_local")] +use crate::handle::StaticHandle; + use crate::{ event::TaskBuilder, handle::Handle, @@ -155,12 +160,7 @@ use crate::{ scope::{EscapableHandleScope, HandleScope}, }, types::{ - boxed::{Finalize, JsBox}, - error::JsError, - extract::FromArgs, - private::ValueInternal, - Deferred, JsArray, JsArrayBuffer, JsBoolean, JsBuffer, JsFunction, JsNull, JsNumber, - JsObject, JsPromise, JsString, JsUndefined, JsValue, StringResult, Value, + boxed::{Finalize, JsBox}, error::JsError, extract::FromArgs, private::ValueInternal, Deferred, JsArray, JsArrayBuffer, JsBoolean, JsBuffer, JsFunction, JsNull, JsNumber, JsObject, JsPromise, JsString, JsSymbol, JsUndefined, JsValue, StringResult, Value }, }; @@ -290,6 +290,24 @@ pub trait Context<'a>: ContextInternal<'a> { result } + /// Execute a future on the local JavaScript thread. This does not block JavaScript execution. + /// + /// Note: Avoid doing heavy computation on the main thread. The intended use case for this is + /// waiting on channel receivers for data coming from other threads, waiting on timers and + /// handling async behaviors from JavaScript. + #[cfg(feature = "async_local")] + fn spawn_local(&mut self, f: F) + where + Fut: Future, + F: FnOnce(AsyncContext) -> Fut + 'static, + { + let env = self.env(); + crate::async_local::spawn_async_local(self, async move { + let future = f(AsyncContext { env }); + future.await; + }); + } + /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. /// /// Handles created in the new scope are kept alive only for the duration of the computation and cannot escape, with the exception of the result value, which is rooted in the outer context. @@ -336,6 +354,11 @@ pub trait Context<'a>: ContextInternal<'a> { JsNumber::new(self, x.into()) } + /// Convenience method for creating a `JsSymbol` value. + fn symbol>(&mut self, d: D) -> Handle<'a, JsSymbol> { + JsSymbol::new(self, d) + } + /// Convenience method for creating a `JsString` value. /// /// If the string exceeds the limits of the JS engine, this method panics. @@ -593,6 +616,41 @@ impl<'a> ModuleContext<'a> { Ok(()) } + #[cfg(feature = "async_local")] + pub fn export_function_async<'b, F, V, Fut>(&mut self, key: &str, f: F) -> NeonResult<()> + where + Fut: Future>, + F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, + V: Value, + { + use crate::handle::StaticHandle; + + let wrapper = JsFunction::new(self, move |mut cx| { + let mut args = vec![]; + + while let Some(arg) = cx.argument_opt(args.len()) { + let arg = arg.as_value(&mut cx); + let arg = StaticHandle::new(&mut cx, arg)?; + args.push(arg); + } + + let (deferred, promise) = cx.promise(); + cx.spawn_local(move |mut cx| async move { + let acx = AsyncFunctionContext { + env: cx.env(), + arguments: args, + }; + deferred.resolve(&mut cx, f(acx).await.unwrap()); + () + }); + + Ok(promise) + })?; + + self.exports.clone().set(self, key, wrapper)?; + Ok(()) + } + /// Exports a JavaScript value from a Neon module. pub fn export_value(&mut self, key: &str, val: Handle) -> NeonResult<()> { self.exports.clone().set(self, key, val)?; @@ -627,6 +685,22 @@ impl<'a> ContextInternal<'a> for ExecuteContext<'a> { impl<'a> Context<'a> for ExecuteContext<'a> {} +/// An execution context of a scope created by [`Context::execute_async()`](Context::execute_async). +#[cfg(feature = "async_local")] +pub struct AsyncContext { + env: Env, +} + +#[cfg(feature = "async_local")] +impl<'a> ContextInternal<'static> for AsyncContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "async_local")] +impl Context<'static> for AsyncContext {} + /// An execution context of a scope created by [`Context::compute_scoped()`](Context::compute_scoped). pub struct ComputeContext<'a> { env: Env, @@ -773,6 +847,45 @@ impl<'a> ContextInternal<'a> for FunctionContext<'a> { impl<'a> Context<'a> for FunctionContext<'a> {} +/// An execution context of an async function call. +/// +/// The type parameter `T` is the type of the `this`-binding. +#[cfg(feature = "async_local")] +pub struct AsyncFunctionContext { + env: Env, + arguments: Vec>, +} + +#[cfg(feature = "async_local")] +impl<'a> AsyncFunctionContext { + pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { + let arg = self.arguments.get(i).unwrap().clone(); + let arg = arg.from_static(self)?; + let value = unsafe { V::from_local(self.env(), arg.to_local()) }; + let handle = Handle::new_internal(value); + Ok(handle) + } +} + +#[cfg(feature = "async_local")] +impl<'a> ContextInternal<'a> for AsyncFunctionContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "async_local")] +impl<'a> Context<'a> for AsyncFunctionContext {} + +#[cfg(feature = "async_local")] +impl Drop for AsyncFunctionContext { + fn drop(&mut self) { + while let Some(arg) = self.arguments.pop() { + arg.drop(self).unwrap(); + } + } +} + /// An execution context of a task completion callback. pub struct TaskContext<'a> { env: Env, diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index ee6a91a9c..7974257e2 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -48,7 +48,8 @@ pub(crate) mod internal; -pub(crate) mod root; +pub(crate) mod root_object; +pub(crate) mod root_value; use std::{ error::Error, @@ -58,12 +59,13 @@ use std::{ ops::{Deref, DerefMut}, }; -pub use self::root::Root; +pub use self::root_object::Root; +pub use self::root_value::*; use crate::{ context::Context, handle::internal::{SuperType, TransparentNoCopyWrapper}, - result::{JsResult, ResultExt}, + result::{JsResult, NeonResult, ResultExt}, sys, types::Value, }; @@ -93,6 +95,13 @@ impl<'a, V: Value + 'a> Handle<'a, V> { phantom: PhantomData, } } + + /// Detaches the value from the Nodejs garbage collector + /// and manages the variable lifetime via reference counting. + /// Useful when interacting with a value within async closures + pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult> { + StaticHandle::new(cx, self) + } } /// An error representing a failed downcast. diff --git a/crates/neon/src/handle/root.rs b/crates/neon/src/handle/root_object.rs similarity index 100% rename from crates/neon/src/handle/root.rs rename to crates/neon/src/handle/root_object.rs diff --git a/crates/neon/src/handle/root_value.rs b/crates/neon/src/handle/root_value.rs new file mode 100644 index 000000000..ad838c4d2 --- /dev/null +++ b/crates/neon/src/handle/root_value.rs @@ -0,0 +1,128 @@ +use once_cell::unsync::OnceCell; +use std::cell::RefCell; +use std::marker::PhantomData; +use std::rc::Rc; + +use super::Root; +use super::Value; +use crate::context::Context; +use crate::object::Object; +use crate::prelude::Handle; +use crate::result::JsResult; +use crate::result::NeonResult; +use crate::types::JsFunction; +use crate::types::JsObject; +use crate::types::JsSymbol; + +// This creates a rooted object and stores javascript +// values on it as a way to grant any JavaScript value +// a static lifetime + +thread_local! { + static NEON_CACHE: OnceCell> = OnceCell::default(); +} + +/// Reference counted JavaScript value with a static lifetime for use in async closures +pub struct StaticHandle { + pub(crate) count: Rc>, + pub(crate) inner: Rc>, + _p: PhantomData, +} + +impl StaticHandle { + pub(crate) fn new<'a>( + cx: &mut impl Context<'a>, + value: Handle<'a, T>, + ) -> NeonResult> { + Ok(Self { + count: Rc::new(RefCell::new(1)), + inner: Rc::new(set_ref(cx, value)?), + _p: Default::default(), + }) + } + + pub fn clone(&self) -> StaticHandle { + let mut count = self.count.borrow_mut(); + *count += 1; + drop(count); + + Self { + count: self.count.clone(), + inner: self.inner.clone(), + _p: self._p.clone(), + } + } + + pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { + get_ref(cx, &self.inner) + } + + pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> { + let mut count = self.count.borrow_mut(); + *count -= 1; + + if *count == 0 { + delete_ref(cx, &self.inner)? + } + + Ok(()) + } +} + +fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { + let neon_cache = NEON_CACHE.with({ + |raw_value| { + raw_value + .get_or_try_init(|| -> NeonResult> { + let set_ctor = cx.global_object().get::(cx, "Map")?; + let neon_cache = set_ctor.construct(cx, &[])?; + Ok(neon_cache.root(cx)) + }) + .and_then(|e| Ok(e.clone(cx))) + } + })?; + + Ok(neon_cache.into_inner(cx)) +} + +fn set_ref<'a, V: Value>( + cx: &mut impl Context<'a>, + value: Handle<'a, V>, +) -> NeonResult> { + let neon_cache = get_cache(cx)?; + let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx); + + get_cache(cx)? + .get::(cx, "set")? + .call_with(cx) + .this(neon_cache) + .arg(symbol.clone(cx).into_inner(cx)) + .arg(value) + .exec(cx)?; + + Ok(symbol) +} + +fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root) -> JsResult<'a, V> { + let neon_cache = get_cache(cx)?; + + get_cache(cx)? + .get::(cx, "get")? + .call_with(cx) + .this(neon_cache) + .arg(key.clone(cx).into_inner(cx)) + .apply(cx) +} + +fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root) -> NeonResult<()> { + let neon_cache = get_cache(cx)?; + + get_cache(cx)? + .get::(cx, "delete")? + .call_with(cx) + .this(neon_cache) + .arg(key.clone(cx).into_inner(cx)) + .exec(cx)?; + + Ok(()) +} diff --git a/crates/neon/src/lib.rs b/crates/neon/src/lib.rs index a471b59b5..becf920e7 100644 --- a/crates/neon/src/lib.rs +++ b/crates/neon/src/lib.rs @@ -78,6 +78,8 @@ //! [supported]: https://github.com/neon-bindings/neon#platform-support #![cfg_attr(docsrs, feature(doc_cfg))] +#[cfg(feature = "async_local")] +mod async_local; pub mod context; pub mod event; pub mod handle; diff --git a/crates/neon/src/lifecycle.rs b/crates/neon/src/lifecycle.rs index 0a08ef922..6399c2295 100644 --- a/crates/neon/src/lifecycle.rs +++ b/crates/neon/src/lifecycle.rs @@ -20,7 +20,7 @@ use std::{ use crate::{ context::Context, event::Channel, - handle::root::NapiRef, + handle::root_object::NapiRef, sys::{lifecycle, raw::Env, tsfn::ThreadsafeFunction}, types::promise::NodeApiDeferred, }; diff --git a/crates/neon/src/prelude.rs b/crates/neon/src/prelude.rs index 01b87f55e..837dc5ca1 100644 --- a/crates/neon/src/prelude.rs +++ b/crates/neon/src/prelude.rs @@ -3,8 +3,8 @@ #[doc(no_inline)] pub use crate::{ context::{ - CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext, - TaskContext, + AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext, + FunctionContext, ModuleContext, TaskContext, }, handle::{Handle, Root}, object::Object, diff --git a/crates/neon/src/sys/bindings/functions.rs b/crates/neon/src/sys/bindings/functions.rs index 55bc5038b..f7fda40c0 100644 --- a/crates/neon/src/sys/bindings/functions.rs +++ b/crates/neon/src/sys/bindings/functions.rs @@ -81,6 +81,8 @@ mod napi1 { fn create_range_error(env: Env, code: Value, msg: Value, result: *mut Value) -> Status; + fn create_symbol(env: Env, description: Value, result: *mut Value) -> Status; + fn create_string_utf8( env: Env, str: *const c_char, diff --git a/crates/neon/src/sys/bindings/libuv.rs b/crates/neon/src/sys/bindings/libuv.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/neon/src/sys/mod.rs b/crates/neon/src/sys/mod.rs index 10bdde4fc..5f730d68f 100644 --- a/crates/neon/src/sys/mod.rs +++ b/crates/neon/src/sys/mod.rs @@ -89,6 +89,7 @@ pub(crate) mod raw; pub(crate) mod reference; pub(crate) mod scope; pub(crate) mod string; +pub(crate) mod symbol; pub(crate) mod tag; pub(crate) mod typedarray; diff --git a/crates/neon/src/sys/symbol.rs b/crates/neon/src/sys/symbol.rs new file mode 100644 index 000000000..a0bf71c47 --- /dev/null +++ b/crates/neon/src/sys/symbol.rs @@ -0,0 +1,10 @@ +use super::{ + bindings as napi, + raw::{Env, Local}, +}; + +pub unsafe fn new(out: &mut Local, env: Env, description: Local) -> bool { + let status = napi::create_symbol(env, description, out); + + status == napi::Status::Ok +} diff --git a/crates/neon/src/sys/tag.rs b/crates/neon/src/sys/tag.rs index 91b2266a6..b918611b6 100644 --- a/crates/neon/src/sys/tag.rs +++ b/crates/neon/src/sys/tag.rs @@ -31,6 +31,11 @@ pub unsafe fn is_boolean(env: Env, val: Local) -> bool { is_type(env, val, napi::ValueType::Boolean) } +/// Is `val` a JavaScript symbol? +pub unsafe fn is_symbol(env: Env, val: Local) -> bool { + is_type(env, val, napi::ValueType::Symbol) +} + /// Is `val` a JavaScript string? pub unsafe fn is_string(env: Env, val: Local) -> bool { is_type(env, val, napi::ValueType::String) diff --git a/crates/neon/src/types_impl/mod.rs b/crates/neon/src/types_impl/mod.rs index 3f8677e51..167c09bd8 100644 --- a/crates/neon/src/types_impl/mod.rs +++ b/crates/neon/src/types_impl/mod.rs @@ -409,6 +409,58 @@ impl ValueInternal for JsBoolean { } } +/// The type of JavaScript +/// [symbol](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol) +/// primitives. +#[derive(Debug)] +#[repr(transparent)] +pub struct JsSymbol(raw::Local); + +unsafe impl TransparentNoCopyWrapper for JsSymbol { + type Inner = raw::Local; + + fn into_inner(self) -> Self::Inner { + self.0 + } +} + +impl Value for JsSymbol {} + +impl Object for JsSymbol {} + +impl ValueInternal for JsSymbol { + fn name() -> &'static str { + "symbol" + } + + fn is_typeof(env: Env, other: &Other) -> bool { + unsafe { sys::tag::is_symbol(env.to_raw(), other.to_local()) } + } + + fn to_local(&self) -> raw::Local { + self.0 + } + + unsafe fn from_local(_env: Env, h: raw::Local) -> Self { + JsSymbol(h) + } +} + +impl JsSymbol { + pub fn new<'a, C: Context<'a>, S: AsRef>( + cx: &mut C, + description: S, + ) -> Handle<'a, JsSymbol> { + let description = JsString::new(cx, description); + + unsafe { + let mut local: raw::Local = std::mem::zeroed(); + sys::symbol::new(&mut local, cx.env().to_raw(), description.to_local()); + Handle::new_internal(JsSymbol(local)) + } + } +} + /// The type of JavaScript /// [string](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Data_structures#primitive_values) /// primitives.