From 637bb5f9928b6b782306b0b1dea2a7120d6cb26c Mon Sep 17 00:00:00 2001 From: David Alsh Date: Fri, 21 Jun 2024 19:25:05 +1000 Subject: [PATCH 1/9] Added executor for Rust futures driven by Libuv --- .gitignore | 1 + Cargo.lock | 161 +++++++++++++++++++++- crates/neon/Cargo.toml | 7 +- crates/neon/src/asynch/libuv.rs | 26 ++++ crates/neon/src/asynch/mod.rs | 6 + crates/neon/src/asynch/root.rs | 76 ++++++++++ crates/neon/src/asynch/runtime.rs | 64 +++++++++ crates/neon/src/context/mod.rs | 109 +++++++++++++++ crates/neon/src/lib.rs | 2 + crates/neon/src/sys/bindings/functions.rs | 5 + crates/neon/src/sys/bindings/libuv.rs | 0 crates/neon/src/sys/bindings/mod.rs | 3 +- crates/neon/src/sys/bindings/types.rs | 13 ++ 13 files changed, 467 insertions(+), 6 deletions(-) create mode 100644 crates/neon/src/asynch/libuv.rs create mode 100644 crates/neon/src/asynch/mod.rs create mode 100644 crates/neon/src/asynch/root.rs create mode 100644 crates/neon/src/asynch/runtime.rs create mode 100644 crates/neon/src/sys/bindings/libuv.rs diff --git a/.gitignore b/.gitignore index 740828832..d012485bd 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ pkgs/create-neon/create-neon-manual-test-project test/cli/lib npm-debug.log rls*.log +.vscode diff --git a/Cargo.lock b/Cargo.lock index 065d7b326..62d51fad1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "alsh_libuv" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc181a3a77b87d8e49d24e02fe13d3371906bfcd13820db8cbb400e48d51e8b" +dependencies = [ + "bitflags 1.2.1", + "libuv-sys2", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -73,7 +83,7 @@ version = "0.65.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", "cexpr", "clang-sys", "lazy_static", @@ -90,11 +100,31 @@ dependencies = [ "which", ] +[[package]] +name = "bindgen" +version = "0.68.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726e4313eb6ec35d2730258ad4e15b547ee75d6afaa1361a922e78e59b7d8078" +dependencies = [ + "bitflags 2.4.1", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.57", +] + [[package]] name = "bitflags" -version = "1.3.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "bitflags" @@ -178,6 +208,95 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.57", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" + +[[package]] +name = "futures-task" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" + +[[package]] +name = "futures-util" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.11" @@ -297,6 +416,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libuv-sys2" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6125e1a220a5698a154ce76762d2ef8884baf9f77da7ceb8a3bd8c5ce27df343" +dependencies = [ + "bindgen 0.68.1", + "cc", + "pkg-config", +] + [[package]] name = "linkify" version = "0.10.0" @@ -373,10 +503,12 @@ dependencies = [ name = "neon" version = "1.0.0" dependencies = [ + "alsh_libuv", "anyhow", "aquamarine", "doc-comment", "easy-cast", + "futures", "getrandom", "libloading 0.8.1", "linkify", @@ -409,7 +541,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d252ccdd7b72dd5e92ab65471d6a26ba47375139a6071ead3dbf191f60de9903" dependencies = [ - "bindgen", + "bindgen 0.65.1", ] [[package]] @@ -506,6 +638,18 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -712,6 +856,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.2" diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index d15b86396..0dd570e5c 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -35,6 +35,9 @@ doc-comment = { version = "0.3.3", optional = true } send_wrapper = "0.6.0" serde = { version = "1.0.197", optional = true } serde_json = { version = "1.0.114", optional = true } +futures = { version = "0.3", optional = true } +# Fork of libuv bindings for Rust +libuv = { package = "alsh_libuv", version = "2", features = ["sys"], optional = true } [dependencies.tokio] version = "1.34.0" @@ -43,7 +46,7 @@ features = ["sync"] optional = true [features] -default = ["napi-8"] +default = ["napi-8", "futures"] # Enable extracting values by serializing to JSON serde = ["dep:serde", "dep:serde_json"] @@ -57,6 +60,8 @@ external-buffers = [] # https://github.com/neon-bindings/rfcs/pull/46 futures = ["tokio"] +asynch = ["dep:futures", "dep:libuv"] + # Enable low-level system APIs. The `sys` API allows augmenting the Neon API # from external crates. sys = [] diff --git a/crates/neon/src/asynch/libuv.rs b/crates/neon/src/asynch/libuv.rs new file mode 100644 index 000000000..9245f8683 --- /dev/null +++ b/crates/neon/src/asynch/libuv.rs @@ -0,0 +1,26 @@ +use std::mem::MaybeUninit; +use std::rc::Rc; + +use crate::context::internal::Env; +use crate::sys::bindings::get_uv_event_loop; +use libuv::sys::uv_loop_t; +use libuv::Loop; +use once_cell::unsync::OnceCell; + +thread_local! { + pub static LIB_UV: OnceCell> = OnceCell::new(); +} + +/// Gets a reference to Libuv +pub fn get_lib_uv<'a>(env: &Env) -> Rc { + LIB_UV.with(move |cell| { + cell.get_or_init(move || { + let mut result = MaybeUninit::uninit(); + unsafe { get_uv_event_loop(env.to_raw(), result.as_mut_ptr()) }; + let ptr = unsafe { *result.as_mut_ptr() }; + let ptr = ptr as *mut uv_loop_t; + Rc::new(unsafe { libuv::r#loop::Loop::from_external(ptr) }) + }) + .clone() + }) +} diff --git a/crates/neon/src/asynch/mod.rs b/crates/neon/src/asynch/mod.rs new file mode 100644 index 000000000..1b53d2052 --- /dev/null +++ b/crates/neon/src/asynch/mod.rs @@ -0,0 +1,6 @@ +//! This module extends Libuv to work as an executor for Rust futures +pub mod root; +mod libuv; +mod runtime; + +pub use runtime::*; diff --git a/crates/neon/src/asynch/root.rs b/crates/neon/src/asynch/root.rs new file mode 100644 index 000000000..3941e429e --- /dev/null +++ b/crates/neon/src/asynch/root.rs @@ -0,0 +1,76 @@ +/* + This is a basic method of persisting JavaScript values so + they are given a static lifetime and not collected by GC + + This is needed because not all JsValues can be called with .root() + and is probably temporary +*/ +use std::cell::RefCell; + +use crate::context::Context; +use crate::handle::Handle; +use crate::object::Object; +use crate::types::JsObject; +use crate::types::Value; +use once_cell::unsync::Lazy; + +thread_local! { + pub static THREAD_LOCAL_COUNT: Lazy> = Lazy::new(|| RefCell::new(0)); + pub static GLOBAL_KEY: Lazy = Lazy::new(|| { + let mut lower = [0; std::mem::size_of::()]; + getrandom::getrandom(&mut lower).expect("Unable to generate number"); + let lower = u16::from_ne_bytes(lower); + format!("__neon_root_cache_{}", lower) + }); +} + +fn ref_count_inc() -> usize { + THREAD_LOCAL_COUNT.with(|c| { + let mut c = c.borrow_mut(); + let current = (*c).clone(); + *c += 1; + current + }) +} + +pub fn root<'a>(cx: &mut impl Context<'a>) -> Handle<'a, JsObject> { + let global = cx.global_object(); + let key = GLOBAL_KEY.with(|k| cx.string(&*k.as_str())); + match global.get_opt(cx, key).unwrap() { + Some(obj) => obj, + None => { + let init = cx.empty_object(); + global.set(cx, key, init).unwrap(); + global.get_opt(cx, key).unwrap().unwrap() + } + } +} + +#[derive(Clone, Debug)] +pub struct RootGlobal { + inner: String, +} + +impl RootGlobal { + pub fn new<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> Self { + let index = ref_count_inc(); + let key_str = format!("{}", index); + let key = cx.string(&key_str); + let cache = root(cx); + cache.set(cx, key, value).unwrap(); + Self { inner: key_str } + } + + pub fn into_inner<'a, V: Value>(&self, cx: &mut impl Context<'a>) -> Handle<'a, V> { + let key = cx.string(&self.inner); + let cache = root(cx); + cache.get(cx, key).unwrap() + } + + pub fn remove<'a>(&self, cx: &mut impl Context<'a>) -> bool { + let key = cx.string(&self.inner); + let val = cx.undefined(); + let cache = root(cx); + cache.set(cx, key, val).unwrap() + } +} diff --git a/crates/neon/src/asynch/runtime.rs b/crates/neon/src/asynch/runtime.rs new file mode 100644 index 000000000..d6a6c477c --- /dev/null +++ b/crates/neon/src/asynch/runtime.rs @@ -0,0 +1,64 @@ +use std::cell::RefCell; +use std::future::Future; + +use crate::context::internal::Env; +use futures::task::LocalSpawnExt; +use futures::executor::LocalSpawner; +use futures::executor::LocalPool; +use once_cell::unsync::Lazy; + +use super::libuv::get_lib_uv; + +thread_local! { + static LOCAL_POOL: Lazy> = Lazy::new(|| RefCell::new(LocalPool::new())); + static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner()) ); + static TASK_COUNT: Lazy> = Lazy::new(|| Default::default() ); +} + +pub fn spawn_async_local(env: &Env, future: impl Future + 'static) { + SPAWNER.with(|ls| { + ls.spawn_local(async { + future.await; + task_count_dec(); + }) + .unwrap(); + }); + + // Delegate non-blocking polling of futures to libuv + if task_count_inc() != 0 { + return; + } + + // Idle handle refers to a libuv task that runs while "idling". + // This is not an idle state, rather an analogy to a car engine + let uv = get_lib_uv(env); + let mut task = uv.idle().unwrap(); + + // The idle task will conduct a non-blocking poll of all local futures + // and continue on pending futures allowing the poll to be non-blocking. + // This repeats until no more futures are pending in the local set. + task.start(move |mut task: libuv::IdleHandle| { + if task_count() != 0 { + LOCAL_POOL.with(|lp| lp.borrow_mut().run_until_stalled()); + } else { + task.stop().unwrap(); + } + }) + .unwrap(); +} + +fn task_count() -> usize { + TASK_COUNT.with(|c| *c.borrow_mut()) +} + +fn task_count_inc() -> usize { + let current = task_count(); + TASK_COUNT.with(|c| *c.borrow_mut() += 1); + current +} + +fn task_count_dec() -> usize { + let current = task_count(); + TASK_COUNT.with(|c| *c.borrow_mut() -= 1); + current +} diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 0d936ed3f..8e08be635 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,6 +145,11 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; +#[cfg(feature = "asynch")] +use crate::asynch::{root::RootGlobal, spawn_async_local}; +#[cfg(feature = "asynch")] +use futures::Future; + use crate::{ event::TaskBuilder, handle::Handle, @@ -290,6 +295,24 @@ pub trait Context<'a>: ContextInternal<'a> { result } + #[cfg(feature = "asynch")] + fn execute_async(&mut self, f: F) + where + Fut: Future, + F: FnOnce(AsyncContext) -> Fut + 'static, + { + use futures::Future; + + let env = self.env(); + + spawn_async_local(&env, async move { + let scope = unsafe { HandleScope::new(env.to_raw()) }; + let future = f(AsyncContext { env }); + future.await; + drop(scope); + }); + } + /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. /// /// Handles created in the new scope are kept alive only for the duration of the computation and cannot escape, with the exception of the result value, which is rooted in the outer context. @@ -593,6 +616,39 @@ impl<'a> ModuleContext<'a> { Ok(()) } + #[cfg(feature = "asynch")] + pub fn export_function_async<'b, F, V, Fut>(&mut self, key: &str, f: F) -> NeonResult<()> + where + Fut: Future>, + F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, + V: Value, + { + let wrapper = JsFunction::new(self, move |mut cx| { + let mut args = vec![]; + + while let Some(arg) = cx.argument_opt(args.len()) { + let arg = arg.as_value(&mut cx); + let arg = RootGlobal::new(&mut cx, arg); + args.push(arg); + } + + let (deferred, promise) = cx.promise(); + cx.execute_async(move |mut cx| async move { + let acx = AsyncFunctionContext { + env: cx.env(), + arguments: args, + }; + deferred.resolve(&mut cx, f(acx).await.unwrap()); + () + }); + + Ok(promise) + })?; + + self.exports.clone().set(self, key, wrapper)?; + Ok(()) + } + /// Exports a JavaScript value from a Neon module. pub fn export_value(&mut self, key: &str, val: Handle) -> NeonResult<()> { self.exports.clone().set(self, key, val)?; @@ -627,6 +683,22 @@ impl<'a> ContextInternal<'a> for ExecuteContext<'a> { impl<'a> Context<'a> for ExecuteContext<'a> {} +/// An execution context of a scope created by [`Context::execute_async()`](Context::execute_async). +#[cfg(feature = "asynch")] +pub struct AsyncContext { + env: Env, +} + +#[cfg(feature = "asynch")] +impl<'a> ContextInternal<'static> for AsyncContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "asynch")] +impl Context<'static> for AsyncContext {} + /// An execution context of a scope created by [`Context::compute_scoped()`](Context::compute_scoped). pub struct ComputeContext<'a> { env: Env, @@ -773,6 +845,43 @@ impl<'a> ContextInternal<'a> for FunctionContext<'a> { impl<'a> Context<'a> for FunctionContext<'a> {} +/// An execution context of an async function call. +/// +/// The type parameter `T` is the type of the `this`-binding. +#[cfg(feature = "asynch")] +pub struct AsyncFunctionContext { + env: Env, + arguments: Vec, +} + +#[cfg(feature = "asynch")] +impl<'a> AsyncFunctionContext { + pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { + let arg = self.arguments.get(i).unwrap().clone(); + let handle = arg.into_inner(self); + Ok(handle) + } +} + +#[cfg(feature = "asynch")] +impl<'a> ContextInternal<'a> for AsyncFunctionContext { + fn env(&self) -> Env { + self.env + } +} + +#[cfg(feature = "asynch")] +impl<'a> Context<'a> for AsyncFunctionContext {} + +#[cfg(feature = "asynch")] +impl Drop for AsyncFunctionContext { + fn drop(&mut self) { + while let Some(arg) = self.arguments.pop() { + arg.remove(self); + } + } +} + /// An execution context of a task completion callback. pub struct TaskContext<'a> { env: Env, diff --git a/crates/neon/src/lib.rs b/crates/neon/src/lib.rs index a471b59b5..fb6220219 100644 --- a/crates/neon/src/lib.rs +++ b/crates/neon/src/lib.rs @@ -78,6 +78,8 @@ //! [supported]: https://github.com/neon-bindings/neon#platform-support #![cfg_attr(docsrs, feature(doc_cfg))] +#[cfg(feature = "asynch")] +mod asynch; pub mod context; pub mod event; pub mod handle; diff --git a/crates/neon/src/sys/bindings/functions.rs b/crates/neon/src/sys/bindings/functions.rs index 55bc5038b..c89be36ac 100644 --- a/crates/neon/src/sys/bindings/functions.rs +++ b/crates/neon/src/sys/bindings/functions.rs @@ -266,6 +266,11 @@ mod napi4 { generate!( #[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] extern "C" { + fn get_uv_event_loop( + env: Env, + uv_loop: *mut UvEventLoop, + ); + fn create_threadsafe_function( env: Env, func: Value, diff --git a/crates/neon/src/sys/bindings/libuv.rs b/crates/neon/src/sys/bindings/libuv.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/neon/src/sys/bindings/mod.rs b/crates/neon/src/sys/bindings/mod.rs index cdd8559e1..3b806fe4e 100644 --- a/crates/neon/src/sys/bindings/mod.rs +++ b/crates/neon/src/sys/bindings/mod.rs @@ -42,7 +42,7 @@ macro_rules! napi_name { /// ```ignore /// extern "C" { /// fn get_undefined(env: Env, result: *mut Value) -> Status; -/// /* Additional functions may be included */ +/// /* Additional functions may be included */ /// } /// ``` /// @@ -177,3 +177,4 @@ pub use self::{functions::*, types::*}; mod functions; mod types; +mod libuv; diff --git a/crates/neon/src/sys/bindings/types.rs b/crates/neon/src/sys/bindings/types.rs index a167c9282..d3b2aead0 100644 --- a/crates/neon/src/sys/bindings/types.rs +++ b/crates/neon/src/sys/bindings/types.rs @@ -66,6 +66,19 @@ pub struct Ref__ { /// [`napi_ref`](https://nodejs.org/api/n-api.html#napi_ref) pub type Ref = *mut Ref__; +#[cfg(feature = "napi-4")] +#[repr(C)] +#[derive(Debug, Copy, Clone)] +#[doc(hidden)] +pub struct UvEventLoop__ { + _unused: [u8; 0], +} + +#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] +#[cfg(feature = "napi-4")] +/// [`napi_threadsafe_function`](https://nodejs.org/api/n-api.html#napi_threadsafe_function) +pub type UvEventLoop = *mut UvEventLoop__; + #[cfg(feature = "napi-4")] #[repr(C)] #[derive(Debug, Copy, Clone)] From 62757a4dd25069d3be3b2e11ccf674c5429e3c0f Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 09:24:11 +1000 Subject: [PATCH 2/9] async local feature --- Cargo.lock | 50 +--- crates/neon/Cargo.toml | 4 +- crates/neon/src/async_local/executor/enter.rs | 53 ++++ .../src/async_local/executor/local_pool.rs | 230 ++++++++++++++++++ crates/neon/src/async_local/executor/mod.rs | 4 + crates/neon/src/async_local/mod.rs | 53 ++++ .../src/async_local/runtime/futures_count.rs | 35 +++ crates/neon/src/async_local/runtime/mod.rs | 7 + .../neon/src/async_local/runtime/runtime.rs | 27 ++ crates/neon/src/async_local/runtime/waker.rs | 75 ++++++ crates/neon/src/asynch/libuv.rs | 26 -- crates/neon/src/asynch/mod.rs | 6 - crates/neon/src/asynch/root.rs | 76 ------ crates/neon/src/asynch/runtime.rs | 64 ----- crates/neon/src/context/mod.rs | 105 ++++---- crates/neon/src/handle/js_rc.rs | 146 +++++++++++ crates/neon/src/handle/mod.rs | 2 + crates/neon/src/lib.rs | 4 +- 18 files changed, 691 insertions(+), 276 deletions(-) create mode 100644 crates/neon/src/async_local/executor/enter.rs create mode 100644 crates/neon/src/async_local/executor/local_pool.rs create mode 100644 crates/neon/src/async_local/executor/mod.rs create mode 100644 crates/neon/src/async_local/mod.rs create mode 100644 crates/neon/src/async_local/runtime/futures_count.rs create mode 100644 crates/neon/src/async_local/runtime/mod.rs create mode 100644 crates/neon/src/async_local/runtime/runtime.rs create mode 100644 crates/neon/src/async_local/runtime/waker.rs delete mode 100644 crates/neon/src/asynch/libuv.rs delete mode 100644 crates/neon/src/asynch/mod.rs delete mode 100644 crates/neon/src/asynch/root.rs delete mode 100644 crates/neon/src/asynch/runtime.rs create mode 100644 crates/neon/src/handle/js_rc.rs diff --git a/Cargo.lock b/Cargo.lock index 62d51fad1..b26cfc035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,16 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alsh_libuv" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bc181a3a77b87d8e49d24e02fe13d3371906bfcd13820db8cbb400e48d51e8b" -dependencies = [ - "bitflags 1.2.1", - "libuv-sys2", -] - [[package]] name = "anyhow" version = "1.0.75" @@ -100,26 +90,6 @@ dependencies = [ "which", ] -[[package]] -name = "bindgen" -version = "0.68.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "726e4313eb6ec35d2730258ad4e15b547ee75d6afaa1361a922e78e59b7d8078" -dependencies = [ - "bitflags 2.4.1", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", - "syn 2.0.57", -] - [[package]] name = "bitflags" version = "1.2.1" @@ -416,17 +386,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libuv-sys2" -version = "1.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6125e1a220a5698a154ce76762d2ef8884baf9f77da7ceb8a3bd8c5ce27df343" -dependencies = [ - "bindgen 0.68.1", - "cc", - "pkg-config", -] - [[package]] name = "linkify" version = "0.10.0" @@ -503,7 +462,6 @@ dependencies = [ name = "neon" version = "1.0.0" dependencies = [ - "alsh_libuv", "anyhow", "aquamarine", "doc-comment", @@ -541,7 +499,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d252ccdd7b72dd5e92ab65471d6a26ba47375139a6071ead3dbf191f60de9903" dependencies = [ - "bindgen 0.65.1", + "bindgen", ] [[package]] @@ -644,12 +602,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkg-config" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" - [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index 0dd570e5c..493498714 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -36,8 +36,6 @@ send_wrapper = "0.6.0" serde = { version = "1.0.197", optional = true } serde_json = { version = "1.0.114", optional = true } futures = { version = "0.3", optional = true } -# Fork of libuv bindings for Rust -libuv = { package = "alsh_libuv", version = "2", features = ["sys"], optional = true } [dependencies.tokio] version = "1.34.0" @@ -60,7 +58,7 @@ external-buffers = [] # https://github.com/neon-bindings/rfcs/pull/46 futures = ["tokio"] -asynch = ["dep:futures", "dep:libuv"] +async_local = ["dep:futures"] # Enable low-level system APIs. The `sys` API allows augmenting the Neon API # from external crates. diff --git a/crates/neon/src/async_local/executor/enter.rs b/crates/neon/src/async_local/executor/enter.rs new file mode 100644 index 000000000..24063e9f8 --- /dev/null +++ b/crates/neon/src/async_local/executor/enter.rs @@ -0,0 +1,53 @@ +use std::cell::Cell; +use std::fmt; + +std::thread_local!(static ENTERED: Cell = Cell::new(false)); + +pub struct Enter { + _priv: (), +} + +pub struct EnterError { + _priv: (), +} + +impl fmt::Debug for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EnterError").finish() + } +} + +impl fmt::Display for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "an execution scope has already been entered") + } +} + +impl std::error::Error for EnterError {} + +pub fn enter() -> Result { + ENTERED.with(|c| { + if c.get() { + Err(EnterError { _priv: () }) + } else { + c.set(true); + + Ok(Enter { _priv: () }) + } + }) +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + c.set(false); + }); + } +} diff --git a/crates/neon/src/async_local/executor/local_pool.rs b/crates/neon/src/async_local/executor/local_pool.rs new file mode 100644 index 000000000..6599c2ebc --- /dev/null +++ b/crates/neon/src/async_local/executor/local_pool.rs @@ -0,0 +1,230 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::rc::Weak; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; +use std::thread::Thread; +use std::vec::Vec; + +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; +use futures::task::waker_ref; +use futures::task::ArcWake; +use futures::task::Context; +use futures::task::FutureObj; +use futures::task::LocalFutureObj; +use futures::task::LocalSpawn; +use futures::task::Poll; +use futures::task::Spawn; +use futures::task::SpawnError; + +use super::enter::enter; + +#[derive(Debug)] +pub struct LocalPool { + pool: FuturesUnordered>, + incoming: Rc, +} + +#[derive(Clone, Debug)] +pub struct LocalSpawner { + incoming: Weak, +} + +type Incoming = RefCell>>; + +#[derive(Debug)] +pub(crate) struct ThreadNotify { + pub(crate) thread: Thread, + pub(crate) unparked: AtomicBool, +} + +pub type ThreadNotifyRef = Arc; + +impl ThreadNotify { + pub fn new() -> Arc { + Arc::new(ThreadNotify { + thread: thread::current(), + unparked: AtomicBool::new(false), + }) + } +} + +impl ArcWake for ThreadNotify { + fn wake_by_ref(arc_self: &Arc) { + let unparked = arc_self.unparked.swap(true, Ordering::Release); + if !unparked { + arc_self.thread.unpark(); + } + } +} + +pub fn wait_for_wake(thread_notify: &ThreadNotify) { + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + std::thread::park(); + } +} + +fn woken(thread_notify: &ThreadNotify) -> bool { + thread_notify.unparked.load(Ordering::Acquire) +} + +fn run_executor) -> Poll>( + thread_notify: Arc, + mut f: F, +) -> T { + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); + + let waker = waker_ref(&thread_notify); + let mut cx = Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f(&mut cx) { + return t; + } + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + // No wakeup occurred. It may occur now, right before parking, + // but in that case the token made available by `unpark()` + // is guaranteed to still be available and `park()` is a no-op. + thread::park(); + } + } +} + +impl LocalPool { + pub fn new() -> Self { + Self { + pool: FuturesUnordered::new(), + incoming: Default::default(), + } + } + + pub fn spawner(&self) -> LocalSpawner { + LocalSpawner { + incoming: Rc::downgrade(&self.incoming), + } + } + + // Note: Can be used to interleave futures with the JS event loop + /// Runs all tasks and returns after completing one future or until no more progress + /// can be made. Returns `true` if one future was completed, `false` otherwise. + #[allow(unused)] + pub fn try_run_one(&mut self, thread_notify: Arc) -> bool { + run_executor(thread_notify.clone(), |cx| { + loop { + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), + } + + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken(&thread_notify) { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); + } + } + }) + } + + /// Runs all tasks in the pool and returns if no more progress can be made on any task. + #[allow(unused)] + pub fn run_until_stalled(&mut self, thread_notify: Arc) { + run_executor(thread_notify.clone(), |cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken(&thread_notify) { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } + }); + } + + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { + loop { + self.drain_incoming(); + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. + if !self.incoming.borrow().is_empty() { + continue; + } + + match pool_ret { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } + + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) + } + } +} + +impl Default for LocalPool { + fn default() -> Self { + Self::new() + } +} + +impl Spawn for LocalSpawner { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future.into()); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} + +impl LocalSpawn for LocalSpawner { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status_local(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} diff --git a/crates/neon/src/async_local/executor/mod.rs b/crates/neon/src/async_local/executor/mod.rs new file mode 100644 index 000000000..3a933d0eb --- /dev/null +++ b/crates/neon/src/async_local/executor/mod.rs @@ -0,0 +1,4 @@ +mod enter; +mod local_pool; + +pub use self::local_pool::*; diff --git a/crates/neon/src/async_local/mod.rs b/crates/neon/src/async_local/mod.rs new file mode 100644 index 000000000..3893e9d94 --- /dev/null +++ b/crates/neon/src/async_local/mod.rs @@ -0,0 +1,53 @@ +mod executor; +mod runtime; + +use std::future::Future; + +use self::runtime::LocalFuturesCount; +use self::runtime::LocalRuntime; +use crate::context::Context; +use crate::sys; +use crate::types::JsFunction; + +pub(crate) fn spawn_async_local<'a>( + cx: &mut impl Context<'a>, + future: impl Future + 'static, +) -> Result<(), ()> { + let future = async move { + future.await; + LocalFuturesCount::decrement(); + }; + + LocalRuntime::spawn_local(future).unwrap(); + + // If there are tasks in flight then the + // executor is already initialized + if LocalFuturesCount::count() != 0 { + return Ok(()); + } + + // Start the futures digest cycle + let env_raw = cx.env().to_raw(); + + unsafe { sys::fun::new(env_raw, "", |env, _| { + + }) }; + + // unsafe { + // sys::create_threadsafe_function( + // env_raw, + // func, + // async_resource, + // async_resource_name, + // max_queue_size, + // initial_thread_count, + // thread_finalize_data, + // thread_finalize_cb, + // context, + // call_js_cb, + // result, + // ) + // } + + Ok(()) +} diff --git a/crates/neon/src/async_local/runtime/futures_count.rs b/crates/neon/src/async_local/runtime/futures_count.rs new file mode 100644 index 000000000..51f5fbcb8 --- /dev/null +++ b/crates/neon/src/async_local/runtime/futures_count.rs @@ -0,0 +1,35 @@ +use once_cell::unsync::Lazy; +use std::cell::RefCell; + +thread_local! { + static FUTURES_COUNT: Lazy> = Lazy::default(); +} + +/// Local counter of the futures running on the current thread +/// +/// By default the futures executor will keep the Nodejs process open. +/// This is desirable to avoid Nodejs from exiting before the futures +/// complete their execution. +/// +/// For this reason, we need to keep a local counter of how many futures +/// are currently in flight so we can unref the executor when all of +/// the promises are settled. +pub struct LocalFuturesCount; + +impl LocalFuturesCount { + pub fn count() -> usize { + FUTURES_COUNT.with(|c| *c.borrow_mut()) + } + + pub fn increment() -> usize { + let futures_count = LocalFuturesCount::count(); + FUTURES_COUNT.with(|c| *c.borrow_mut() += 1); + futures_count + } + + pub fn decrement() -> usize { + let futures_count = LocalFuturesCount::count(); + FUTURES_COUNT.with(|c| *c.borrow_mut() -= 1); + futures_count + } +} diff --git a/crates/neon/src/async_local/runtime/mod.rs b/crates/neon/src/async_local/runtime/mod.rs new file mode 100644 index 000000000..2288d15cd --- /dev/null +++ b/crates/neon/src/async_local/runtime/mod.rs @@ -0,0 +1,7 @@ +mod futures_count; +mod runtime; +mod waker; + +pub (super) use futures_count::*; +pub (super) use runtime::*; +pub (super) use waker::*; diff --git a/crates/neon/src/async_local/runtime/runtime.rs b/crates/neon/src/async_local/runtime/runtime.rs new file mode 100644 index 000000000..342980317 --- /dev/null +++ b/crates/neon/src/async_local/runtime/runtime.rs @@ -0,0 +1,27 @@ +use std::cell::RefCell; +use std::future::Future; + +use super::super::executor::LocalPool; +use super::super::executor::LocalSpawner; +use super::super::executor::ThreadNotifyRef; +use futures::task::LocalSpawnExt; +use futures::task::SpawnError; +use once_cell::unsync::Lazy; + +thread_local! { + static LOCAL_POOL: Lazy> = Lazy::default(); + static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner())); +} + +pub struct LocalRuntime; + +impl LocalRuntime { + pub fn spawn_local(future: impl Future + 'static) -> Result<(), SpawnError> { + SPAWNER.with(move |ls| ls.spawn_local(future)) + } + + pub fn run_until_stalled(thread_notify: ThreadNotifyRef) { + LOCAL_POOL.with(move |lp| lp.borrow_mut().run_until_stalled(thread_notify)); + } +} + diff --git a/crates/neon/src/async_local/runtime/waker.rs b/crates/neon/src/async_local/runtime/waker.rs new file mode 100644 index 000000000..403667856 --- /dev/null +++ b/crates/neon/src/async_local/runtime/waker.rs @@ -0,0 +1,75 @@ +use std::sync::mpsc::channel; +use std::sync::mpsc::SendError; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::thread; + +use super::super::executor::wait_for_wake; +use super::super::executor::ThreadNotify; +use once_cell::unsync::Lazy; + +thread_local! { + static WAKER_THREAD: Lazy> = Lazy::new(LocalWaker::start_waker_thread); +} + +pub type WakerInit = Arc; + +pub enum WakerEvent { + Init(WakerInit), + Next, + Done, +} + +/// The futures waker that coordinates with the futures executor to notify +/// the main thread to pause and resume execution of futures. +/// +/// The waker is implemented as a dedicated system thread which is parked +/// by the local futures executor while waiting for futures to resume work. +/// +/// Once woken up, the waker resumes execution of futures on the JavaScript +/// thread by triggering a napi threadsafe function to poll the futures in +/// the local pool until no more progress can be made before yielding back +/// to the Nodejs event loop. +/// +/// This allows for the execution of Rust futures to integrate with the +/// Nodejs event loop without blocking either +pub struct LocalWaker; + +impl LocalWaker { + pub fn send(event: WakerEvent) -> Result<(), SendError> { + WAKER_THREAD.with(|tx| tx.send(event)) + } + + fn start_waker_thread() -> Sender { + let (tx, rx) = channel(); + + // Dedicated waker thread to use for waiting on pending futures + thread::spawn(move || { + let thread_notify = ThreadNotify::new(); + let mut handle = None::; + + while let Ok(event) = rx.recv() { + match event { + WakerEvent::Init(incoming) => { + if handle.replace(incoming).is_some() { + // Error + }; + // Call JS + } + WakerEvent::Next => { + wait_for_wake(&thread_notify); + // Call JS + } + WakerEvent::Done => { + if let Some(handle) = handle.take() { + drop(handle); + } + } + }; + } + }); + + tx + } +} + diff --git a/crates/neon/src/asynch/libuv.rs b/crates/neon/src/asynch/libuv.rs deleted file mode 100644 index 9245f8683..000000000 --- a/crates/neon/src/asynch/libuv.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::mem::MaybeUninit; -use std::rc::Rc; - -use crate::context::internal::Env; -use crate::sys::bindings::get_uv_event_loop; -use libuv::sys::uv_loop_t; -use libuv::Loop; -use once_cell::unsync::OnceCell; - -thread_local! { - pub static LIB_UV: OnceCell> = OnceCell::new(); -} - -/// Gets a reference to Libuv -pub fn get_lib_uv<'a>(env: &Env) -> Rc { - LIB_UV.with(move |cell| { - cell.get_or_init(move || { - let mut result = MaybeUninit::uninit(); - unsafe { get_uv_event_loop(env.to_raw(), result.as_mut_ptr()) }; - let ptr = unsafe { *result.as_mut_ptr() }; - let ptr = ptr as *mut uv_loop_t; - Rc::new(unsafe { libuv::r#loop::Loop::from_external(ptr) }) - }) - .clone() - }) -} diff --git a/crates/neon/src/asynch/mod.rs b/crates/neon/src/asynch/mod.rs deleted file mode 100644 index 1b53d2052..000000000 --- a/crates/neon/src/asynch/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! This module extends Libuv to work as an executor for Rust futures -pub mod root; -mod libuv; -mod runtime; - -pub use runtime::*; diff --git a/crates/neon/src/asynch/root.rs b/crates/neon/src/asynch/root.rs deleted file mode 100644 index 3941e429e..000000000 --- a/crates/neon/src/asynch/root.rs +++ /dev/null @@ -1,76 +0,0 @@ -/* - This is a basic method of persisting JavaScript values so - they are given a static lifetime and not collected by GC - - This is needed because not all JsValues can be called with .root() - and is probably temporary -*/ -use std::cell::RefCell; - -use crate::context::Context; -use crate::handle::Handle; -use crate::object::Object; -use crate::types::JsObject; -use crate::types::Value; -use once_cell::unsync::Lazy; - -thread_local! { - pub static THREAD_LOCAL_COUNT: Lazy> = Lazy::new(|| RefCell::new(0)); - pub static GLOBAL_KEY: Lazy = Lazy::new(|| { - let mut lower = [0; std::mem::size_of::()]; - getrandom::getrandom(&mut lower).expect("Unable to generate number"); - let lower = u16::from_ne_bytes(lower); - format!("__neon_root_cache_{}", lower) - }); -} - -fn ref_count_inc() -> usize { - THREAD_LOCAL_COUNT.with(|c| { - let mut c = c.borrow_mut(); - let current = (*c).clone(); - *c += 1; - current - }) -} - -pub fn root<'a>(cx: &mut impl Context<'a>) -> Handle<'a, JsObject> { - let global = cx.global_object(); - let key = GLOBAL_KEY.with(|k| cx.string(&*k.as_str())); - match global.get_opt(cx, key).unwrap() { - Some(obj) => obj, - None => { - let init = cx.empty_object(); - global.set(cx, key, init).unwrap(); - global.get_opt(cx, key).unwrap().unwrap() - } - } -} - -#[derive(Clone, Debug)] -pub struct RootGlobal { - inner: String, -} - -impl RootGlobal { - pub fn new<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> Self { - let index = ref_count_inc(); - let key_str = format!("{}", index); - let key = cx.string(&key_str); - let cache = root(cx); - cache.set(cx, key, value).unwrap(); - Self { inner: key_str } - } - - pub fn into_inner<'a, V: Value>(&self, cx: &mut impl Context<'a>) -> Handle<'a, V> { - let key = cx.string(&self.inner); - let cache = root(cx); - cache.get(cx, key).unwrap() - } - - pub fn remove<'a>(&self, cx: &mut impl Context<'a>) -> bool { - let key = cx.string(&self.inner); - let val = cx.undefined(); - let cache = root(cx); - cache.set(cx, key, val).unwrap() - } -} diff --git a/crates/neon/src/asynch/runtime.rs b/crates/neon/src/asynch/runtime.rs deleted file mode 100644 index d6a6c477c..000000000 --- a/crates/neon/src/asynch/runtime.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; - -use crate::context::internal::Env; -use futures::task::LocalSpawnExt; -use futures::executor::LocalSpawner; -use futures::executor::LocalPool; -use once_cell::unsync::Lazy; - -use super::libuv::get_lib_uv; - -thread_local! { - static LOCAL_POOL: Lazy> = Lazy::new(|| RefCell::new(LocalPool::new())); - static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner()) ); - static TASK_COUNT: Lazy> = Lazy::new(|| Default::default() ); -} - -pub fn spawn_async_local(env: &Env, future: impl Future + 'static) { - SPAWNER.with(|ls| { - ls.spawn_local(async { - future.await; - task_count_dec(); - }) - .unwrap(); - }); - - // Delegate non-blocking polling of futures to libuv - if task_count_inc() != 0 { - return; - } - - // Idle handle refers to a libuv task that runs while "idling". - // This is not an idle state, rather an analogy to a car engine - let uv = get_lib_uv(env); - let mut task = uv.idle().unwrap(); - - // The idle task will conduct a non-blocking poll of all local futures - // and continue on pending futures allowing the poll to be non-blocking. - // This repeats until no more futures are pending in the local set. - task.start(move |mut task: libuv::IdleHandle| { - if task_count() != 0 { - LOCAL_POOL.with(|lp| lp.borrow_mut().run_until_stalled()); - } else { - task.stop().unwrap(); - } - }) - .unwrap(); -} - -fn task_count() -> usize { - TASK_COUNT.with(|c| *c.borrow_mut()) -} - -fn task_count_inc() -> usize { - let current = task_count(); - TASK_COUNT.with(|c| *c.borrow_mut() += 1); - current -} - -fn task_count_dec() -> usize { - let current = task_count(); - TASK_COUNT.with(|c| *c.borrow_mut() -= 1); - current -} diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 8e08be635..a520a89b4 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,9 +145,9 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; -#[cfg(feature = "asynch")] -use crate::asynch::{root::RootGlobal, spawn_async_local}; -#[cfg(feature = "asynch")] +// #[cfg(feature = "async_local")] +// use crate::async_local::{root::RootGlobal, spawn_async_local}; +#[cfg(feature = "async_local")] use futures::Future; use crate::{ @@ -295,8 +295,8 @@ pub trait Context<'a>: ContextInternal<'a> { result } - #[cfg(feature = "asynch")] - fn execute_async(&mut self, f: F) + #[cfg(feature = "async_local")] + fn execute_async_local(&mut self, f: F) where Fut: Future, F: FnOnce(AsyncContext) -> Fut + 'static, @@ -305,12 +305,12 @@ pub trait Context<'a>: ContextInternal<'a> { let env = self.env(); - spawn_async_local(&env, async move { - let scope = unsafe { HandleScope::new(env.to_raw()) }; - let future = f(AsyncContext { env }); - future.await; - drop(scope); - }); + // spawn_async_local(&env, async move { + // let scope = unsafe { HandleScope::new(env.to_raw()) }; + // let future = f(AsyncContext { env }); + // future.await; + // drop(scope); + // }); } /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. @@ -499,6 +499,10 @@ pub trait Context<'a>: ContextInternal<'a> { JsBox::new(self, v) } + // fn rc() -> Handle<> { + + // } + #[cfg(feature = "napi-4")] #[deprecated(since = "0.9.0", note = "Please use the channel() method instead")] #[doc(hidden)] @@ -616,36 +620,36 @@ impl<'a> ModuleContext<'a> { Ok(()) } - #[cfg(feature = "asynch")] + #[cfg(feature = "async_local")] pub fn export_function_async<'b, F, V, Fut>(&mut self, key: &str, f: F) -> NeonResult<()> where Fut: Future>, F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, V: Value, { - let wrapper = JsFunction::new(self, move |mut cx| { - let mut args = vec![]; - - while let Some(arg) = cx.argument_opt(args.len()) { - let arg = arg.as_value(&mut cx); - let arg = RootGlobal::new(&mut cx, arg); - args.push(arg); - } - - let (deferred, promise) = cx.promise(); - cx.execute_async(move |mut cx| async move { - let acx = AsyncFunctionContext { - env: cx.env(), - arguments: args, - }; - deferred.resolve(&mut cx, f(acx).await.unwrap()); - () - }); - - Ok(promise) - })?; - - self.exports.clone().set(self, key, wrapper)?; + // let wrapper = JsFunction::new(self, move |mut cx| { + // let mut args = vec![]; + + // while let Some(arg) = cx.argument_opt(args.len()) { + // let arg = arg.as_value(&mut cx); + // let arg = RootGlobal::new(&mut cx, arg); + // args.push(arg); + // } + + // let (deferred, promise) = cx.promise(); + // cx.execute_async_local(move |mut cx| async move { + // let acx = AsyncFunctionContext { + // env: cx.env(), + // arguments: args, + // }; + // deferred.resolve(&mut cx, f(acx).await.unwrap()); + // () + // }); + + // Ok(promise) + // })?; + + // self.exports.clone().set(self, key, wrapper)?; Ok(()) } @@ -684,19 +688,19 @@ impl<'a> ContextInternal<'a> for ExecuteContext<'a> { impl<'a> Context<'a> for ExecuteContext<'a> {} /// An execution context of a scope created by [`Context::execute_async()`](Context::execute_async). -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] pub struct AsyncContext { env: Env, } -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl<'a> ContextInternal<'static> for AsyncContext { fn env(&self) -> Env { self.env } } -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl Context<'static> for AsyncContext {} /// An execution context of a scope created by [`Context::compute_scoped()`](Context::compute_scoped). @@ -848,37 +852,38 @@ impl<'a> Context<'a> for FunctionContext<'a> {} /// An execution context of an async function call. /// /// The type parameter `T` is the type of the `this`-binding. -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] pub struct AsyncFunctionContext { env: Env, - arguments: Vec, + // arguments: Vec, } -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl<'a> AsyncFunctionContext { pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { - let arg = self.arguments.get(i).unwrap().clone(); - let handle = arg.into_inner(self); - Ok(handle) + // let arg = self.arguments.get(i).unwrap().clone(); + // let handle = arg.into_inner(self); + // Ok(handle) + todo!() } } -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl<'a> ContextInternal<'a> for AsyncFunctionContext { fn env(&self) -> Env { self.env } } -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl<'a> Context<'a> for AsyncFunctionContext {} -#[cfg(feature = "asynch")] +#[cfg(feature = "async_local")] impl Drop for AsyncFunctionContext { fn drop(&mut self) { - while let Some(arg) = self.arguments.pop() { - arg.remove(self); - } + // while let Some(arg) = self.arguments.pop() { + // arg.remove(self); + // } } } diff --git a/crates/neon/src/handle/js_rc.rs b/crates/neon/src/handle/js_rc.rs new file mode 100644 index 000000000..21ee76cb4 --- /dev/null +++ b/crates/neon/src/handle/js_rc.rs @@ -0,0 +1,146 @@ +// use once_cell::unsync::{Lazy, OnceCell}; +// use std::ops::Deref; +// use std::rc::Rc; +// use std::{cell::RefCell, marker::PhantomData}; + +// use super::Value; +// use crate::context::internal::Env; +// use crate::context::{Context, ExecuteContext}; +// use crate::object::Object; +// use crate::prelude::Handle; +// use crate::result::NeonResult; +// use crate::types::{JsNumber, JsObject}; +// use crate::{result::JsResult, sys, types::JsValue}; + +// static KEY_NEON_CACHE: &str = "__neon_cache"; +// static KEY_INSTANCE_KEY: &str = "__instance_count"; + +// thread_local! { +// /// Basic unique key generation +// static COUNT: Lazy> = Lazy::new(|| Default::default()); +// static CACHE_KEY: OnceCell = OnceCell::default(); +// } + +// /// Reference counted JavaScript value with a static lifetime for use in async closures +// pub struct JsRc { +// pub(crate) raw_env: Env, +// pub(crate) count: Rc>, +// pub(crate) inner_key: Rc, +// pub(crate) inner: RefCell>, +// // _p: PhantomData, +// } + +// impl JsRc { +// pub(crate) fn new<'a>( +// cx: &mut impl Context<'a>, +// value: Handle<'a, T>, +// ) -> NeonResult> { +// let raw_env = cx.boxed(); +// let inner_key = set_ref(cx, value)?; + +// Ok(Self { +// raw_env, +// count: Rc::new(RefCell::new(1)), +// inner_key: Rc::new(inner_key), +// inner: Default::default(), +// }) +// } + +// pub fn clone<'a>(&self, cx: impl Context<'a>) -> Result, ()> { +// todo!(); +// } +// } + +// impl Drop for JsRc { +// fn drop(&mut self) {} +// } + +// impl Deref for JsRc { +// type Target = T; + +// fn deref(&self) -> &T { +// let global_this = JsObject::build(|out| unsafe { +// sys::scope::get_global(self.raw_env.to_raw(), out); +// }); +// // if self.inner.borrow().is_none() { +// // unsafe { +// // sys::error::throw(self.env.to_raw(), v.to_local()); +// // Err(Throw::new()) +// // } +// // let cx = ExecuteContext::from(value); +// // let value = +// // } +// todo!(); +// // &self.inner +// } +// } + +// /* +// globalThis = { +// __napi_cache: { +// __instance_count: number, +// [key: number]: Record +// } +// } + +// Note: Is there a way to store this privately in the module scope? +// */ +// fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { +// let global_this = cx.global_object(); + +// let neon_cache = { +// let neon_cache = global_this.get_opt::(cx, KEY_NEON_CACHE)?; +// if let Some(neon_cache) = neon_cache { +// neon_cache +// } else { +// let neon_cache = cx.empty_object(); +// let initial_count = cx.number(0); +// neon_cache.set(cx, KEY_INSTANCE_KEY, initial_count)?; +// global_this.set(cx, KEY_NEON_CACHE, neon_cache); +// global_this.get::(cx, KEY_NEON_CACHE)? +// } +// }; + +// let instance_count = CACHE_KEY.with(|key| { +// key.get_or_try_init(|| -> NeonResult { +// let instance_count = global_this.get::(cx, KEY_NEON_CACHE)?; +// let instance_count = instance_count.value(cx) as u32; +// let instance_count = instance_count + 1; +// let instance_count_js = cx.number(instance_count); +// global_this.set::<_, _, JsNumber>(cx, KEY_NEON_CACHE, instance_count_js)?; +// Ok(instance_count) +// }) +// .cloned() +// })?; + +// neon_cache.get(cx, instance_count) +// } + +// fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> NeonResult { +// let neon_cache = get_cache(cx)?; + +// let key_raw = COUNT.with(|c| { +// let mut c = c.borrow_mut(); +// let current = c.clone(); +// *c += 1; +// current +// }); + +// let key = cx.number(key_raw); +// neon_cache.set(cx, key, value)?; +// Ok(key_raw) +// } + +// fn get_ref<'a, T: Value>(cx: &mut impl Context<'a>, key: &u32) -> JsResult<'a, T> { +// let neon_cache = get_cache(cx)?; +// let key = cx.number(key.clone()); +// neon_cache.get(cx, key) +// } + +// fn remove_ref<'a>(cx: &mut impl Context<'a>, key: u32) -> NeonResult<()> { +// let neon_cache = get_cache(cx)?; +// let key = cx.number(key); +// let value = cx.undefined(); +// neon_cache.set(cx, key, value)?; +// Ok(()) +// } diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index ee6a91a9c..f9b63a933 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -49,6 +49,7 @@ pub(crate) mod internal; pub(crate) mod root; +pub(crate) mod js_rc; use std::{ error::Error, @@ -59,6 +60,7 @@ use std::{ }; pub use self::root::Root; +pub use self::js_rc::*; use crate::{ context::Context, diff --git a/crates/neon/src/lib.rs b/crates/neon/src/lib.rs index fb6220219..becf920e7 100644 --- a/crates/neon/src/lib.rs +++ b/crates/neon/src/lib.rs @@ -78,8 +78,8 @@ //! [supported]: https://github.com/neon-bindings/neon#platform-support #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(feature = "asynch")] -mod asynch; +#[cfg(feature = "async_local")] +mod async_local; pub mod context; pub mod event; pub mod handle; From 2b0070f0eeae5f5fca6b963afcee2bbcfb5edfd3 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 11:00:56 +1000 Subject: [PATCH 3/9] implementation backed by threadsafe functions --- crates/neon/src/async_local/mod.rs | 62 +++++++++---------- crates/neon/src/async_local/runtime.rs | 52 ++++++++++++++++ .../src/async_local/runtime/futures_count.rs | 35 ----------- crates/neon/src/async_local/runtime/mod.rs | 7 --- .../neon/src/async_local/runtime/runtime.rs | 27 -------- .../src/async_local/{runtime => }/waker.rs | 32 ++++++---- crates/neon/src/context/mod.rs | 12 ++-- crates/neon/src/sys/bindings/functions.rs | 5 -- 8 files changed, 107 insertions(+), 125 deletions(-) create mode 100644 crates/neon/src/async_local/runtime.rs delete mode 100644 crates/neon/src/async_local/runtime/futures_count.rs delete mode 100644 crates/neon/src/async_local/runtime/mod.rs delete mode 100644 crates/neon/src/async_local/runtime/runtime.rs rename crates/neon/src/async_local/{runtime => }/waker.rs (69%) diff --git a/crates/neon/src/async_local/mod.rs b/crates/neon/src/async_local/mod.rs index 3893e9d94..17f7a0ed2 100644 --- a/crates/neon/src/async_local/mod.rs +++ b/crates/neon/src/async_local/mod.rs @@ -1,53 +1,49 @@ mod executor; mod runtime; +mod waker; use std::future::Future; -use self::runtime::LocalFuturesCount; +use executor::ThreadNotifyRef; +use waker::LocalWaker; +use waker::WakerEvent; + use self::runtime::LocalRuntime; use crate::context::Context; use crate::sys; -use crate::types::JsFunction; -pub(crate) fn spawn_async_local<'a>( +/// Schedule a future to run asynchronously on the local JavaScript thread. +/// The future's execution will not block the local thread. +pub fn spawn_async_local<'a>( cx: &mut impl Context<'a>, - future: impl Future + 'static, + future: impl Future + 'static, ) -> Result<(), ()> { - let future = async move { - future.await; - LocalFuturesCount::decrement(); - }; - - LocalRuntime::spawn_local(future).unwrap(); + // Add a future to the future pool to be executed + // whenever the Nodejs event loop is free to do so + LocalRuntime::queue_future(future).unwrap(); - // If there are tasks in flight then the - // executor is already initialized - if LocalFuturesCount::count() != 0 { + // If there are tasks in flight then the executor + // is already running and should be reused + if LocalRuntime::futures_count() > 1 { return Ok(()); } - // Start the futures digest cycle + // The futures executor runs on another thread and will + // use a threadsafe function to call schedule work + // on the JavaScript thread let env_raw = cx.env().to_raw(); - unsafe { sys::fun::new(env_raw, "", |env, _| { - - }) }; - - // unsafe { - // sys::create_threadsafe_function( - // env_raw, - // func, - // async_resource, - // async_resource_name, - // max_queue_size, - // initial_thread_count, - // thread_finalize_data, - // thread_finalize_cb, - // context, - // call_js_cb, - // result, - // ) - // } + LocalWaker::send(WakerEvent::Init(unsafe { + sys::tsfn::ThreadsafeFunction::::new(env_raw, |_, thread_notify| { + let done = LocalRuntime::run_until_stalled(thread_notify); + + if done { + LocalWaker::send(WakerEvent::Done); + } else { + LocalWaker::send(WakerEvent::Next); + } + }) + })); Ok(()) } diff --git a/crates/neon/src/async_local/runtime.rs b/crates/neon/src/async_local/runtime.rs new file mode 100644 index 000000000..f1a49d22d --- /dev/null +++ b/crates/neon/src/async_local/runtime.rs @@ -0,0 +1,52 @@ +use std::cell::RefCell; +use std::future::Future; + +use super::executor::LocalPool; +use super::executor::LocalSpawner; +use super::executor::ThreadNotifyRef; +use futures::task::LocalSpawnExt; +use futures::task::SpawnError; +use once_cell::unsync::Lazy; + +thread_local! { + static LOCAL_POOL: Lazy> = Lazy::default(); + static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner())); + static FUTURES_COUNT: Lazy> = Lazy::default(); +} + +pub struct LocalRuntime; + +impl LocalRuntime { + pub fn futures_count() -> usize { + Self::count() + } + + pub fn queue_future(future: impl Future + 'static) -> Result<(), SpawnError> { + Self::increment(); + SPAWNER.with(move |ls| ls.spawn_local(async move { + future.await; + Self::decrement(); + })) + } + + pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool { + LOCAL_POOL.with(move |lp| lp.borrow_mut().run_until_stalled(thread_notify)); + if Self::count() == 0 { + true + } else { + false + } + } + + fn count() -> usize { + FUTURES_COUNT.with(|c| *c.borrow_mut()) + } + + fn increment() { + FUTURES_COUNT.with(|c| *c.borrow_mut() += 1); + } + + fn decrement() { + FUTURES_COUNT.with(|c| *c.borrow_mut() -= 1); + } +} diff --git a/crates/neon/src/async_local/runtime/futures_count.rs b/crates/neon/src/async_local/runtime/futures_count.rs deleted file mode 100644 index 51f5fbcb8..000000000 --- a/crates/neon/src/async_local/runtime/futures_count.rs +++ /dev/null @@ -1,35 +0,0 @@ -use once_cell::unsync::Lazy; -use std::cell::RefCell; - -thread_local! { - static FUTURES_COUNT: Lazy> = Lazy::default(); -} - -/// Local counter of the futures running on the current thread -/// -/// By default the futures executor will keep the Nodejs process open. -/// This is desirable to avoid Nodejs from exiting before the futures -/// complete their execution. -/// -/// For this reason, we need to keep a local counter of how many futures -/// are currently in flight so we can unref the executor when all of -/// the promises are settled. -pub struct LocalFuturesCount; - -impl LocalFuturesCount { - pub fn count() -> usize { - FUTURES_COUNT.with(|c| *c.borrow_mut()) - } - - pub fn increment() -> usize { - let futures_count = LocalFuturesCount::count(); - FUTURES_COUNT.with(|c| *c.borrow_mut() += 1); - futures_count - } - - pub fn decrement() -> usize { - let futures_count = LocalFuturesCount::count(); - FUTURES_COUNT.with(|c| *c.borrow_mut() -= 1); - futures_count - } -} diff --git a/crates/neon/src/async_local/runtime/mod.rs b/crates/neon/src/async_local/runtime/mod.rs deleted file mode 100644 index 2288d15cd..000000000 --- a/crates/neon/src/async_local/runtime/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod futures_count; -mod runtime; -mod waker; - -pub (super) use futures_count::*; -pub (super) use runtime::*; -pub (super) use waker::*; diff --git a/crates/neon/src/async_local/runtime/runtime.rs b/crates/neon/src/async_local/runtime/runtime.rs deleted file mode 100644 index 342980317..000000000 --- a/crates/neon/src/async_local/runtime/runtime.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; - -use super::super::executor::LocalPool; -use super::super::executor::LocalSpawner; -use super::super::executor::ThreadNotifyRef; -use futures::task::LocalSpawnExt; -use futures::task::SpawnError; -use once_cell::unsync::Lazy; - -thread_local! { - static LOCAL_POOL: Lazy> = Lazy::default(); - static SPAWNER: Lazy = Lazy::new(|| LOCAL_POOL.with(|ex| ex.borrow().spawner())); -} - -pub struct LocalRuntime; - -impl LocalRuntime { - pub fn spawn_local(future: impl Future + 'static) -> Result<(), SpawnError> { - SPAWNER.with(move |ls| ls.spawn_local(future)) - } - - pub fn run_until_stalled(thread_notify: ThreadNotifyRef) { - LOCAL_POOL.with(move |lp| lp.borrow_mut().run_until_stalled(thread_notify)); - } -} - diff --git a/crates/neon/src/async_local/runtime/waker.rs b/crates/neon/src/async_local/waker.rs similarity index 69% rename from crates/neon/src/async_local/runtime/waker.rs rename to crates/neon/src/async_local/waker.rs index 403667856..70198294f 100644 --- a/crates/neon/src/async_local/runtime/waker.rs +++ b/crates/neon/src/async_local/waker.rs @@ -1,18 +1,19 @@ use std::sync::mpsc::channel; -use std::sync::mpsc::SendError; use std::sync::mpsc::Sender; -use std::sync::Arc; use std::thread; -use super::super::executor::wait_for_wake; -use super::super::executor::ThreadNotify; +use crate::sys; + +use super::executor::wait_for_wake; +use super::executor::ThreadNotify; +use super::executor::ThreadNotifyRef; use once_cell::unsync::Lazy; thread_local! { static WAKER_THREAD: Lazy> = Lazy::new(LocalWaker::start_waker_thread); } -pub type WakerInit = Arc; +pub type WakerInit = sys::tsfn::ThreadsafeFunction; pub enum WakerEvent { Init(WakerInit), @@ -33,11 +34,13 @@ pub enum WakerEvent { /// /// This allows for the execution of Rust futures to integrate with the /// Nodejs event loop without blocking either -pub struct LocalWaker; +pub struct LocalWaker; impl LocalWaker { - pub fn send(event: WakerEvent) -> Result<(), SendError> { - WAKER_THREAD.with(|tx| tx.send(event)) + pub fn send(event: WakerEvent) { + WAKER_THREAD + .with(|tx| tx.send(event)) + .expect("Unable to communicate with waker"); } fn start_waker_thread() -> Sender { @@ -52,13 +55,19 @@ impl LocalWaker { match event { WakerEvent::Init(incoming) => { if handle.replace(incoming).is_some() { - // Error + panic!("Handle already init"); + }; + let Some(ref handle) = handle else { + panic!("No handle"); }; - // Call JS + handle.call(thread_notify.clone(), None).ok(); } WakerEvent::Next => { wait_for_wake(&thread_notify); - // Call JS + let Some(ref handle) = handle else { + panic!("No handle"); + }; + handle.call(thread_notify.clone(), None).ok(); } WakerEvent::Done => { if let Some(handle) = handle.take() { @@ -72,4 +81,3 @@ impl LocalWaker { tx } } - diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index a520a89b4..186f94556 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -305,12 +305,12 @@ pub trait Context<'a>: ContextInternal<'a> { let env = self.env(); - // spawn_async_local(&env, async move { - // let scope = unsafe { HandleScope::new(env.to_raw()) }; - // let future = f(AsyncContext { env }); - // future.await; - // drop(scope); - // }); + crate::async_local::spawn_async_local(self, async move { + // let scope = unsafe { HandleScope::new(env.to_raw()) }; + let future = f(AsyncContext { env }); + future.await; + // drop(scope); + }).unwrap(); } /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. diff --git a/crates/neon/src/sys/bindings/functions.rs b/crates/neon/src/sys/bindings/functions.rs index c89be36ac..55bc5038b 100644 --- a/crates/neon/src/sys/bindings/functions.rs +++ b/crates/neon/src/sys/bindings/functions.rs @@ -266,11 +266,6 @@ mod napi4 { generate!( #[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] extern "C" { - fn get_uv_event_loop( - env: Env, - uv_loop: *mut UvEventLoop, - ); - fn create_threadsafe_function( env: Env, func: Value, From 6f3a670d9d343488e9b61715583ad81904834ad1 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 11:19:19 +1000 Subject: [PATCH 4/9] root_global --- crates/neon/src/context/mod.rs | 4 - crates/neon/src/handle/js_rc.rs | 146 -------------------------- crates/neon/src/handle/mod.rs | 4 +- crates/neon/src/handle/root_global.rs | 126 ++++++++++++++++++++++ crates/neon/src/sys/bindings/mod.rs | 3 +- crates/neon/src/sys/bindings/types.rs | 13 --- 6 files changed, 129 insertions(+), 167 deletions(-) delete mode 100644 crates/neon/src/handle/js_rc.rs create mode 100644 crates/neon/src/handle/root_global.rs diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 186f94556..4d156fe0a 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -499,10 +499,6 @@ pub trait Context<'a>: ContextInternal<'a> { JsBox::new(self, v) } - // fn rc() -> Handle<> { - - // } - #[cfg(feature = "napi-4")] #[deprecated(since = "0.9.0", note = "Please use the channel() method instead")] #[doc(hidden)] diff --git a/crates/neon/src/handle/js_rc.rs b/crates/neon/src/handle/js_rc.rs deleted file mode 100644 index 21ee76cb4..000000000 --- a/crates/neon/src/handle/js_rc.rs +++ /dev/null @@ -1,146 +0,0 @@ -// use once_cell::unsync::{Lazy, OnceCell}; -// use std::ops::Deref; -// use std::rc::Rc; -// use std::{cell::RefCell, marker::PhantomData}; - -// use super::Value; -// use crate::context::internal::Env; -// use crate::context::{Context, ExecuteContext}; -// use crate::object::Object; -// use crate::prelude::Handle; -// use crate::result::NeonResult; -// use crate::types::{JsNumber, JsObject}; -// use crate::{result::JsResult, sys, types::JsValue}; - -// static KEY_NEON_CACHE: &str = "__neon_cache"; -// static KEY_INSTANCE_KEY: &str = "__instance_count"; - -// thread_local! { -// /// Basic unique key generation -// static COUNT: Lazy> = Lazy::new(|| Default::default()); -// static CACHE_KEY: OnceCell = OnceCell::default(); -// } - -// /// Reference counted JavaScript value with a static lifetime for use in async closures -// pub struct JsRc { -// pub(crate) raw_env: Env, -// pub(crate) count: Rc>, -// pub(crate) inner_key: Rc, -// pub(crate) inner: RefCell>, -// // _p: PhantomData, -// } - -// impl JsRc { -// pub(crate) fn new<'a>( -// cx: &mut impl Context<'a>, -// value: Handle<'a, T>, -// ) -> NeonResult> { -// let raw_env = cx.boxed(); -// let inner_key = set_ref(cx, value)?; - -// Ok(Self { -// raw_env, -// count: Rc::new(RefCell::new(1)), -// inner_key: Rc::new(inner_key), -// inner: Default::default(), -// }) -// } - -// pub fn clone<'a>(&self, cx: impl Context<'a>) -> Result, ()> { -// todo!(); -// } -// } - -// impl Drop for JsRc { -// fn drop(&mut self) {} -// } - -// impl Deref for JsRc { -// type Target = T; - -// fn deref(&self) -> &T { -// let global_this = JsObject::build(|out| unsafe { -// sys::scope::get_global(self.raw_env.to_raw(), out); -// }); -// // if self.inner.borrow().is_none() { -// // unsafe { -// // sys::error::throw(self.env.to_raw(), v.to_local()); -// // Err(Throw::new()) -// // } -// // let cx = ExecuteContext::from(value); -// // let value = -// // } -// todo!(); -// // &self.inner -// } -// } - -// /* -// globalThis = { -// __napi_cache: { -// __instance_count: number, -// [key: number]: Record -// } -// } - -// Note: Is there a way to store this privately in the module scope? -// */ -// fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { -// let global_this = cx.global_object(); - -// let neon_cache = { -// let neon_cache = global_this.get_opt::(cx, KEY_NEON_CACHE)?; -// if let Some(neon_cache) = neon_cache { -// neon_cache -// } else { -// let neon_cache = cx.empty_object(); -// let initial_count = cx.number(0); -// neon_cache.set(cx, KEY_INSTANCE_KEY, initial_count)?; -// global_this.set(cx, KEY_NEON_CACHE, neon_cache); -// global_this.get::(cx, KEY_NEON_CACHE)? -// } -// }; - -// let instance_count = CACHE_KEY.with(|key| { -// key.get_or_try_init(|| -> NeonResult { -// let instance_count = global_this.get::(cx, KEY_NEON_CACHE)?; -// let instance_count = instance_count.value(cx) as u32; -// let instance_count = instance_count + 1; -// let instance_count_js = cx.number(instance_count); -// global_this.set::<_, _, JsNumber>(cx, KEY_NEON_CACHE, instance_count_js)?; -// Ok(instance_count) -// }) -// .cloned() -// })?; - -// neon_cache.get(cx, instance_count) -// } - -// fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> NeonResult { -// let neon_cache = get_cache(cx)?; - -// let key_raw = COUNT.with(|c| { -// let mut c = c.borrow_mut(); -// let current = c.clone(); -// *c += 1; -// current -// }); - -// let key = cx.number(key_raw); -// neon_cache.set(cx, key, value)?; -// Ok(key_raw) -// } - -// fn get_ref<'a, T: Value>(cx: &mut impl Context<'a>, key: &u32) -> JsResult<'a, T> { -// let neon_cache = get_cache(cx)?; -// let key = cx.number(key.clone()); -// neon_cache.get(cx, key) -// } - -// fn remove_ref<'a>(cx: &mut impl Context<'a>, key: u32) -> NeonResult<()> { -// let neon_cache = get_cache(cx)?; -// let key = cx.number(key); -// let value = cx.undefined(); -// neon_cache.set(cx, key, value)?; -// Ok(()) -// } diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index f9b63a933..0cde6c4e4 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -49,7 +49,7 @@ pub(crate) mod internal; pub(crate) mod root; -pub(crate) mod js_rc; +pub(crate) mod root_global; use std::{ error::Error, @@ -60,7 +60,7 @@ use std::{ }; pub use self::root::Root; -pub use self::js_rc::*; +pub use self::root_global::*; use crate::{ context::Context, diff --git a/crates/neon/src/handle/root_global.rs b/crates/neon/src/handle/root_global.rs new file mode 100644 index 000000000..26ea7337d --- /dev/null +++ b/crates/neon/src/handle/root_global.rs @@ -0,0 +1,126 @@ +use once_cell::unsync::Lazy; +use once_cell::unsync::OnceCell; +use std::cell::RefCell; +use std::rc::Rc; + +use super::Value; +use crate::context::Context; +use crate::object::Object; +use crate::prelude::Handle; +use crate::result::JsResult; +use crate::result::NeonResult; +use crate::types::JsNumber; +use crate::types::JsObject; + +static KEY_NEON_CACHE: &str = "__neon_cache"; +static KEY_INSTANCE_KEY: &str = "__instance_count"; + +thread_local! { + /// Basic unique key generation + static COUNT: Lazy> = Lazy::new(|| Default::default()); + static CACHE_KEY: OnceCell = OnceCell::default(); +} + +/// Reference counted JavaScript value with a static lifetime for use in async closures +pub struct RootGlobal { + pub(crate) count: Rc>, + pub(crate) inner_key: Rc, + pub(crate) inner: RefCell>, +} + +impl RootGlobal { + pub(crate) fn new<'a>( + cx: &mut impl Context<'a>, + value: Handle<'a, T>, + ) -> NeonResult> { + let inner_key = set_ref(cx, value)?; + + Ok(Self { + count: Rc::new(RefCell::new(1)), + inner_key: Rc::new(inner_key), + inner: Default::default(), + }) + } + + pub fn clone<'a>(&self, cx: impl Context<'a>) -> Result, ()> { + todo!(); + } + + pub fn deref<'a>(&self, cx: impl Context<'a>) -> Result { + todo!(); + } + + pub fn drop<'a>(&self, cx: impl Context<'a>) -> Result<(), ()> { + todo!(); + } +} + +/* + globalThis = { + __napi_cache: { + __instance_count: number, + [key: number]: Record + } + } + + Note: Is there a way to store this privately in the module scope? +*/ +fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { + let global_this = cx.global_object(); + + let neon_cache = { + let neon_cache = global_this.get_opt::(cx, KEY_NEON_CACHE)?; + if let Some(neon_cache) = neon_cache { + neon_cache + } else { + let neon_cache = cx.empty_object(); + let initial_count = cx.number(0); + neon_cache.set(cx, KEY_INSTANCE_KEY, initial_count)?; + global_this.set(cx, KEY_NEON_CACHE, neon_cache)?; + global_this.get::(cx, KEY_NEON_CACHE)? + } + }; + + let instance_count = CACHE_KEY.with(|key| { + key.get_or_try_init(|| -> NeonResult { + let instance_count = global_this.get::(cx, KEY_NEON_CACHE)?; + let instance_count = instance_count.value(cx) as u32; + let instance_count = instance_count + 1; + let instance_count_js = cx.number(instance_count); + global_this.set::<_, _, JsNumber>(cx, KEY_NEON_CACHE, instance_count_js)?; + Ok(instance_count) + }) + .cloned() + })?; + + neon_cache.get(cx, instance_count) +} + +fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> NeonResult { + let neon_cache = get_cache(cx)?; + + let key_raw = COUNT.with(|c| { + let mut c = c.borrow_mut(); + let current = c.clone(); + *c += 1; + current + }); + + let key = cx.number(key_raw); + neon_cache.set(cx, key, value)?; + Ok(key_raw) +} + +fn get_ref<'a, T: Value>(cx: &mut impl Context<'a>, key: &u32) -> JsResult<'a, T> { + let neon_cache = get_cache(cx)?; + let key = cx.number(key.clone()); + neon_cache.get(cx, key) +} + +fn remove_ref<'a>(cx: &mut impl Context<'a>, key: u32) -> NeonResult<()> { + let neon_cache = get_cache(cx)?; + let key = cx.number(key); + let value = cx.undefined(); + neon_cache.set(cx, key, value)?; + Ok(()) +} diff --git a/crates/neon/src/sys/bindings/mod.rs b/crates/neon/src/sys/bindings/mod.rs index 3b806fe4e..cdd8559e1 100644 --- a/crates/neon/src/sys/bindings/mod.rs +++ b/crates/neon/src/sys/bindings/mod.rs @@ -42,7 +42,7 @@ macro_rules! napi_name { /// ```ignore /// extern "C" { /// fn get_undefined(env: Env, result: *mut Value) -> Status; -/// /* Additional functions may be included */ +/// /* Additional functions may be included */ /// } /// ``` /// @@ -177,4 +177,3 @@ pub use self::{functions::*, types::*}; mod functions; mod types; -mod libuv; diff --git a/crates/neon/src/sys/bindings/types.rs b/crates/neon/src/sys/bindings/types.rs index d3b2aead0..a167c9282 100644 --- a/crates/neon/src/sys/bindings/types.rs +++ b/crates/neon/src/sys/bindings/types.rs @@ -66,19 +66,6 @@ pub struct Ref__ { /// [`napi_ref`](https://nodejs.org/api/n-api.html#napi_ref) pub type Ref = *mut Ref__; -#[cfg(feature = "napi-4")] -#[repr(C)] -#[derive(Debug, Copy, Clone)] -#[doc(hidden)] -pub struct UvEventLoop__ { - _unused: [u8; 0], -} - -#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] -#[cfg(feature = "napi-4")] -/// [`napi_threadsafe_function`](https://nodejs.org/api/n-api.html#napi_threadsafe_function) -pub type UvEventLoop = *mut UvEventLoop__; - #[cfg(feature = "napi-4")] #[repr(C)] #[derive(Debug, Copy, Clone)] From ff4180ae802aa278b199c32dc4ed10097b61bf48 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 13:36:22 +1000 Subject: [PATCH 5/9] Added symbol --- crates/neon/src/context/mod.rs | 12 +- crates/neon/src/handle/mod.rs | 6 +- crates/neon/src/handle/root_global.rs | 137 ++++++++++++---------- crates/neon/src/sys/bindings/functions.rs | 2 + crates/neon/src/sys/mod.rs | 1 + crates/neon/src/sys/symbol.rs | 12 ++ crates/neon/src/types_impl/mod.rs | 51 ++++++++ 7 files changed, 155 insertions(+), 66 deletions(-) create mode 100644 crates/neon/src/sys/symbol.rs diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index 4d156fe0a..fcc3d9fea 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -160,12 +160,7 @@ use crate::{ scope::{EscapableHandleScope, HandleScope}, }, types::{ - boxed::{Finalize, JsBox}, - error::JsError, - extract::FromArgs, - private::ValueInternal, - Deferred, JsArray, JsArrayBuffer, JsBoolean, JsBuffer, JsFunction, JsNull, JsNumber, - JsObject, JsPromise, JsString, JsUndefined, JsValue, StringResult, Value, + boxed::{Finalize, JsBox}, error::JsError, extract::FromArgs, private::ValueInternal, Deferred, JsArray, JsArrayBuffer, JsBoolean, JsBuffer, JsFunction, JsNull, JsNumber, JsObject, JsPromise, JsString, JsSymbol, JsUndefined, JsValue, StringResult, Value }, }; @@ -359,6 +354,11 @@ pub trait Context<'a>: ContextInternal<'a> { JsNumber::new(self, x.into()) } + /// Convenience method for creating a `JsSymbol` value. + fn symbol>(&mut self, d: D) -> Handle<'a, JsSymbol> { + JsSymbol::new(self, d) + } + /// Convenience method for creating a `JsString` value. /// /// If the string exceeds the limits of the JS engine, this method panics. diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index 0cde6c4e4..e8564abec 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -65,7 +65,7 @@ pub use self::root_global::*; use crate::{ context::Context, handle::internal::{SuperType, TransparentNoCopyWrapper}, - result::{JsResult, ResultExt}, + result::{JsResult, NeonResult, ResultExt}, sys, types::Value, }; @@ -95,6 +95,10 @@ impl<'a, V: Value + 'a> Handle<'a, V> { phantom: PhantomData, } } + + pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult> { + RootGlobal::new(cx, self) + } } /// An error representing a failed downcast. diff --git a/crates/neon/src/handle/root_global.rs b/crates/neon/src/handle/root_global.rs index 26ea7337d..8213ce2c1 100644 --- a/crates/neon/src/handle/root_global.rs +++ b/crates/neon/src/handle/root_global.rs @@ -1,31 +1,36 @@ use once_cell::unsync::Lazy; use once_cell::unsync::OnceCell; use std::cell::RefCell; +use std::marker::PhantomData; use std::rc::Rc; +use super::TransparentNoCopyWrapper; use super::Value; use crate::context::Context; use crate::object::Object; use crate::prelude::Handle; use crate::result::JsResult; use crate::result::NeonResult; +use crate::sys::Value__; +use crate::types::private::ValueInternal; +use crate::types::JsError; +use crate::types::JsFunction; use crate::types::JsNumber; use crate::types::JsObject; +use crate::types::JsValue; static KEY_NEON_CACHE: &str = "__neon_cache"; -static KEY_INSTANCE_KEY: &str = "__instance_count"; thread_local! { - /// Basic unique key generation - static COUNT: Lazy> = Lazy::new(|| Default::default()); - static CACHE_KEY: OnceCell = OnceCell::default(); + // Symbol("__neon_cache") + static CACHE_SYMBOL: OnceCell<*mut Value__> = OnceCell::default(); } /// Reference counted JavaScript value with a static lifetime for use in async closures pub struct RootGlobal { pub(crate) count: Rc>, - pub(crate) inner_key: Rc, - pub(crate) inner: RefCell>, + pub(crate) inner: Rc<*mut Value__>, + _p: PhantomData, } impl RootGlobal { @@ -33,12 +38,10 @@ impl RootGlobal { cx: &mut impl Context<'a>, value: Handle<'a, T>, ) -> NeonResult> { - let inner_key = set_ref(cx, value)?; - Ok(Self { count: Rc::new(RefCell::new(1)), - inner_key: Rc::new(inner_key), - inner: Default::default(), + inner: Rc::new(set_ref(cx, value)?), + _p: Default::default(), }) } @@ -46,8 +49,11 @@ impl RootGlobal { todo!(); } - pub fn deref<'a>(&self, cx: impl Context<'a>) -> Result { - todo!(); + pub fn deref<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { + // TODO error handling + let env_raw = cx.env(); + let hydrated = unsafe { T::from_local(env_raw, *self.inner) }; + Ok(Handle::new_internal(hydrated)) } pub fn drop<'a>(&self, cx: impl Context<'a>) -> Result<(), ()> { @@ -57,70 +63,83 @@ impl RootGlobal { /* globalThis = { - __napi_cache: { - __instance_count: number, - [key: number]: Record - } + [key: Symbol("__neon_cache")]: Set } - - Note: Is there a way to store this privately in the module scope? */ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { let global_this = cx.global_object(); - let neon_cache = { - let neon_cache = global_this.get_opt::(cx, KEY_NEON_CACHE)?; - if let Some(neon_cache) = neon_cache { - neon_cache - } else { - let neon_cache = cx.empty_object(); - let initial_count = cx.number(0); - neon_cache.set(cx, KEY_INSTANCE_KEY, initial_count)?; - global_this.set(cx, KEY_NEON_CACHE, neon_cache)?; - global_this.get::(cx, KEY_NEON_CACHE)? + let neon_cache_symbol = CACHE_SYMBOL.with({ + |raw_value| { + raw_value + .get_or_try_init(|| -> NeonResult<*mut Value__> { + let symbol_ctor = global_this.get::(cx, "Symbol")?; + let set_ctor = global_this.get::(cx, "Set")?; + + let neon_cache = set_ctor.construct(cx, &[])?; + + let key = cx.string(KEY_NEON_CACHE); + let symbol: Handle = symbol_ctor.call_with(cx).arg(key).apply(cx)?; + let symbol_raw = symbol.to_local(); + + global_this.set(cx, symbol, neon_cache)?; + + Ok(symbol_raw) + }) + .cloned() } + })?; + + let neon_cache_symbol = + Handle::new_internal(unsafe { JsValue::from_local(cx.env(), neon_cache_symbol) }); + + let Some(neon_cache) = global_this.get_opt::(cx, neon_cache_symbol)? else { + return Err(cx.throw_error("Unable to find cache")?); }; - let instance_count = CACHE_KEY.with(|key| { - key.get_or_try_init(|| -> NeonResult { - let instance_count = global_this.get::(cx, KEY_NEON_CACHE)?; - let instance_count = instance_count.value(cx) as u32; - let instance_count = instance_count + 1; - let instance_count_js = cx.number(instance_count); - global_this.set::<_, _, JsNumber>(cx, KEY_NEON_CACHE, instance_count_js)?; - Ok(instance_count) - }) - .cloned() - })?; + console_log(cx, &neon_cache); + console_log(cx, &neon_cache_symbol); - neon_cache.get(cx, instance_count) + Ok(neon_cache) } -fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle) -> NeonResult { +fn set_ref<'a, V: Value>( + cx: &mut impl Context<'a>, + value: Handle<'a, V>, +) -> NeonResult<*mut Value__> { let neon_cache = get_cache(cx)?; + let value_raw = value.to_local(); - let key_raw = COUNT.with(|c| { - let mut c = c.borrow_mut(); - let current = c.clone(); - *c += 1; - current - }); + get_cache(cx)? + .get::(cx, "add")? + .call_with(cx) + .this(neon_cache) + .arg(value) + .exec(cx)?; - let key = cx.number(key_raw); - neon_cache.set(cx, key, value)?; - Ok(key_raw) + Ok(value_raw) } -fn get_ref<'a, T: Value>(cx: &mut impl Context<'a>, key: &u32) -> JsResult<'a, T> { +fn delete_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult<()> { let neon_cache = get_cache(cx)?; - let key = cx.number(key.clone()); - neon_cache.get(cx, key) -} -fn remove_ref<'a>(cx: &mut impl Context<'a>, key: u32) -> NeonResult<()> { - let neon_cache = get_cache(cx)?; - let key = cx.number(key); - let value = cx.undefined(); - neon_cache.set(cx, key, value)?; + get_cache(cx)? + .get::(cx, "delete")? + .call_with(cx) + .this(neon_cache) + .arg(value) + .exec(cx)?; + Ok(()) } + +fn console_log<'a, V: Value>(cx: &mut impl Context<'a>, v: &Handle<'a, V>) { + cx.global::("console") + .unwrap() + .get::(cx, "log") + .unwrap() + .call_with(cx) + .arg(*v) + .exec(cx) + .unwrap(); +} diff --git a/crates/neon/src/sys/bindings/functions.rs b/crates/neon/src/sys/bindings/functions.rs index 55bc5038b..f7fda40c0 100644 --- a/crates/neon/src/sys/bindings/functions.rs +++ b/crates/neon/src/sys/bindings/functions.rs @@ -81,6 +81,8 @@ mod napi1 { fn create_range_error(env: Env, code: Value, msg: Value, result: *mut Value) -> Status; + fn create_symbol(env: Env, description: Value, result: *mut Value) -> Status; + fn create_string_utf8( env: Env, str: *const c_char, diff --git a/crates/neon/src/sys/mod.rs b/crates/neon/src/sys/mod.rs index 10bdde4fc..5f730d68f 100644 --- a/crates/neon/src/sys/mod.rs +++ b/crates/neon/src/sys/mod.rs @@ -89,6 +89,7 @@ pub(crate) mod raw; pub(crate) mod reference; pub(crate) mod scope; pub(crate) mod string; +pub(crate) mod symbol; pub(crate) mod tag; pub(crate) mod typedarray; diff --git a/crates/neon/src/sys/symbol.rs b/crates/neon/src/sys/symbol.rs new file mode 100644 index 000000000..0340c1d16 --- /dev/null +++ b/crates/neon/src/sys/symbol.rs @@ -0,0 +1,12 @@ +use std::{mem::MaybeUninit, ptr}; + +use super::{ + bindings as napi, + raw::{Env, Local}, +}; + +pub unsafe fn new(out: &mut Local, env: Env, description: Local) -> bool { + let status = napi::create_symbol(env, description, out); + + status == napi::Status::Ok +} diff --git a/crates/neon/src/types_impl/mod.rs b/crates/neon/src/types_impl/mod.rs index 3f8677e51..72cdc9f09 100644 --- a/crates/neon/src/types_impl/mod.rs +++ b/crates/neon/src/types_impl/mod.rs @@ -409,6 +409,57 @@ impl ValueInternal for JsBoolean { } } +/// The type of JavaScript +/// [symbol](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol) +/// primitives. +#[derive(Debug)] +#[repr(transparent)] +pub struct JsSymbol(raw::Local); + +unsafe impl TransparentNoCopyWrapper for JsSymbol { + type Inner = raw::Local; + + fn into_inner(self) -> Self::Inner { + self.0 + } +} + +impl Value for JsSymbol {} + +impl ValueInternal for JsSymbol { + fn name() -> &'static str { + "symbol" + } + + fn is_typeof(env: Env, other: &Other) -> bool { + true + // unsafe { sys::tag::is_string(env.to_raw(), other.to_local()) } + } + + fn to_local(&self) -> raw::Local { + self.0 + } + + unsafe fn from_local(_env: Env, h: raw::Local) -> Self { + JsSymbol(h) + } +} + +impl JsSymbol { + pub fn new<'a, C: Context<'a>, S: AsRef>( + cx: &mut C, + description: S, + ) -> Handle<'a, JsSymbol> { + let description = JsString::new(cx, description); + + unsafe { + let mut local: raw::Local = std::mem::zeroed(); + sys::symbol::new(&mut local, cx.env().to_raw(), description.to_local()); + Handle::new_internal(JsSymbol(local)) + } + } +} + /// The type of JavaScript /// [string](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Data_structures#primitive_values) /// primitives. From ae96111ba76a6a12ac70bafa8fd409e135cad559 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 13:38:35 +1000 Subject: [PATCH 6/9] rootable symbol --- crates/neon/src/sys/symbol.rs | 2 -- crates/neon/src/types_impl/mod.rs | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/neon/src/sys/symbol.rs b/crates/neon/src/sys/symbol.rs index 0340c1d16..a0bf71c47 100644 --- a/crates/neon/src/sys/symbol.rs +++ b/crates/neon/src/sys/symbol.rs @@ -1,5 +1,3 @@ -use std::{mem::MaybeUninit, ptr}; - use super::{ bindings as napi, raw::{Env, Local}, diff --git a/crates/neon/src/types_impl/mod.rs b/crates/neon/src/types_impl/mod.rs index 72cdc9f09..efc0e837e 100644 --- a/crates/neon/src/types_impl/mod.rs +++ b/crates/neon/src/types_impl/mod.rs @@ -426,6 +426,8 @@ unsafe impl TransparentNoCopyWrapper for JsSymbol { impl Value for JsSymbol {} +impl Object for JsSymbol {} + impl ValueInternal for JsSymbol { fn name() -> &'static str { "symbol" From 6e6c683dc8e17bb8cc9273384d0a2328e7098ab1 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 14:03:19 +1000 Subject: [PATCH 7/9] Global root is a little better, still weird though --- crates/neon/src/handle/root_global.rs | 108 +++++++++++++------------- crates/neon/src/sys/tag.rs | 5 ++ crates/neon/src/types_impl/mod.rs | 3 +- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/crates/neon/src/handle/root_global.rs b/crates/neon/src/handle/root_global.rs index 8213ce2c1..6c8e25ac6 100644 --- a/crates/neon/src/handle/root_global.rs +++ b/crates/neon/src/handle/root_global.rs @@ -1,35 +1,30 @@ -use once_cell::unsync::Lazy; use once_cell::unsync::OnceCell; use std::cell::RefCell; use std::marker::PhantomData; use std::rc::Rc; -use super::TransparentNoCopyWrapper; +use super::Root; use super::Value; use crate::context::Context; use crate::object::Object; use crate::prelude::Handle; use crate::result::JsResult; use crate::result::NeonResult; -use crate::sys::Value__; -use crate::types::private::ValueInternal; -use crate::types::JsError; use crate::types::JsFunction; -use crate::types::JsNumber; use crate::types::JsObject; -use crate::types::JsValue; +use crate::types::JsSymbol; static KEY_NEON_CACHE: &str = "__neon_cache"; thread_local! { // Symbol("__neon_cache") - static CACHE_SYMBOL: OnceCell<*mut Value__> = OnceCell::default(); + static CACHE_SYMBOL: OnceCell> = OnceCell::default(); } /// Reference counted JavaScript value with a static lifetime for use in async closures pub struct RootGlobal { pub(crate) count: Rc>, - pub(crate) inner: Rc<*mut Value__>, + pub(crate) inner: Rc, _p: PhantomData, } @@ -45,19 +40,31 @@ impl RootGlobal { }) } - pub fn clone<'a>(&self, cx: impl Context<'a>) -> Result, ()> { - todo!(); + pub fn clone<'a>(&self) -> RootGlobal { + let mut count = self.count.borrow_mut(); + *count += 1; + drop(count); + + Self { + count: self.count.clone(), + inner: self.inner.clone(), + _p: self._p.clone(), + } } - pub fn deref<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { - // TODO error handling - let env_raw = cx.env(); - let hydrated = unsafe { T::from_local(env_raw, *self.inner) }; - Ok(Handle::new_internal(hydrated)) + pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { + get_ref(cx, &*self.inner) } - pub fn drop<'a>(&self, cx: impl Context<'a>) -> Result<(), ()> { - todo!(); + pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> { + let mut count = self.count.borrow_mut(); + *count -= 1; + + if *count == 0 { + delete_ref(cx, &*self.inner)? + } + + Ok(()) } } @@ -72,74 +79,69 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { let neon_cache_symbol = CACHE_SYMBOL.with({ |raw_value| { raw_value - .get_or_try_init(|| -> NeonResult<*mut Value__> { - let symbol_ctor = global_this.get::(cx, "Symbol")?; - let set_ctor = global_this.get::(cx, "Set")?; - + .get_or_try_init(|| -> NeonResult> { + let set_ctor = global_this.get::(cx, "Map")?; let neon_cache = set_ctor.construct(cx, &[])?; - let key = cx.string(KEY_NEON_CACHE); - let symbol: Handle = symbol_ctor.call_with(cx).arg(key).apply(cx)?; - let symbol_raw = symbol.to_local(); + let symbol = cx.symbol(KEY_NEON_CACHE); + let symbol = symbol.root(cx); - global_this.set(cx, symbol, neon_cache)?; + { + let symbol = symbol.clone(cx); + let symbol = symbol.into_inner(cx); + global_this.set(cx, symbol, neon_cache)?; + } - Ok(symbol_raw) + Ok(symbol) }) - .cloned() + .and_then(|e| Ok(e.clone(cx))) } })?; - let neon_cache_symbol = - Handle::new_internal(unsafe { JsValue::from_local(cx.env(), neon_cache_symbol) }); + let neon_cache_symbol = neon_cache_symbol.into_inner(cx); let Some(neon_cache) = global_this.get_opt::(cx, neon_cache_symbol)? else { return Err(cx.throw_error("Unable to find cache")?); }; - console_log(cx, &neon_cache); - console_log(cx, &neon_cache_symbol); - Ok(neon_cache) } -fn set_ref<'a, V: Value>( - cx: &mut impl Context<'a>, - value: Handle<'a, V>, -) -> NeonResult<*mut Value__> { +fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult { let neon_cache = get_cache(cx)?; - let value_raw = value.to_local(); + let key = format!("{:?}", value.to_local()); get_cache(cx)? - .get::(cx, "add")? + .get::(cx, "set")? .call_with(cx) .this(neon_cache) + .arg(cx.string(&key)) .arg(value) .exec(cx)?; - Ok(value_raw) + Ok(key) +} + +fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> { + let neon_cache = get_cache(cx)?; + + get_cache(cx)? + .get::(cx, "get")? + .call_with(cx) + .this(neon_cache) + .arg(cx.string(&key)) + .apply(cx) } -fn delete_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult<()> { +fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> { let neon_cache = get_cache(cx)?; get_cache(cx)? .get::(cx, "delete")? .call_with(cx) .this(neon_cache) - .arg(value) + .arg(cx.string(&key)) .exec(cx)?; Ok(()) } - -fn console_log<'a, V: Value>(cx: &mut impl Context<'a>, v: &Handle<'a, V>) { - cx.global::("console") - .unwrap() - .get::(cx, "log") - .unwrap() - .call_with(cx) - .arg(*v) - .exec(cx) - .unwrap(); -} diff --git a/crates/neon/src/sys/tag.rs b/crates/neon/src/sys/tag.rs index 91b2266a6..b918611b6 100644 --- a/crates/neon/src/sys/tag.rs +++ b/crates/neon/src/sys/tag.rs @@ -31,6 +31,11 @@ pub unsafe fn is_boolean(env: Env, val: Local) -> bool { is_type(env, val, napi::ValueType::Boolean) } +/// Is `val` a JavaScript symbol? +pub unsafe fn is_symbol(env: Env, val: Local) -> bool { + is_type(env, val, napi::ValueType::Symbol) +} + /// Is `val` a JavaScript string? pub unsafe fn is_string(env: Env, val: Local) -> bool { is_type(env, val, napi::ValueType::String) diff --git a/crates/neon/src/types_impl/mod.rs b/crates/neon/src/types_impl/mod.rs index efc0e837e..167c09bd8 100644 --- a/crates/neon/src/types_impl/mod.rs +++ b/crates/neon/src/types_impl/mod.rs @@ -434,8 +434,7 @@ impl ValueInternal for JsSymbol { } fn is_typeof(env: Env, other: &Other) -> bool { - true - // unsafe { sys::tag::is_string(env.to_raw(), other.to_local()) } + unsafe { sys::tag::is_symbol(env.to_raw(), other.to_local()) } } fn to_local(&self) -> raw::Local { From 5505668fedb856c778d20d5ca52061894b4a5272 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 14:17:19 +1000 Subject: [PATCH 8/9] root_value --- crates/neon/src/handle/mod.rs | 8 ++-- .../src/handle/{root.rs => root_object.rs} | 0 .../handle/{root_global.rs => root_value.rs} | 39 ++++--------------- crates/neon/src/lifecycle.rs | 2 +- 4 files changed, 12 insertions(+), 37 deletions(-) rename crates/neon/src/handle/{root.rs => root_object.rs} (100%) rename crates/neon/src/handle/{root_global.rs => root_value.rs} (72%) diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index e8564abec..f760d7129 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -48,8 +48,8 @@ pub(crate) mod internal; -pub(crate) mod root; -pub(crate) mod root_global; +pub(crate) mod root_object; +pub(crate) mod root_value; use std::{ error::Error, @@ -59,8 +59,8 @@ use std::{ ops::{Deref, DerefMut}, }; -pub use self::root::Root; -pub use self::root_global::*; +pub use self::root_object::Root; +pub use self::root_value::*; use crate::{ context::Context, diff --git a/crates/neon/src/handle/root.rs b/crates/neon/src/handle/root_object.rs similarity index 100% rename from crates/neon/src/handle/root.rs rename to crates/neon/src/handle/root_object.rs diff --git a/crates/neon/src/handle/root_global.rs b/crates/neon/src/handle/root_value.rs similarity index 72% rename from crates/neon/src/handle/root_global.rs rename to crates/neon/src/handle/root_value.rs index 6c8e25ac6..83aabb533 100644 --- a/crates/neon/src/handle/root_global.rs +++ b/crates/neon/src/handle/root_value.rs @@ -12,13 +12,10 @@ use crate::result::JsResult; use crate::result::NeonResult; use crate::types::JsFunction; use crate::types::JsObject; -use crate::types::JsSymbol; - -static KEY_NEON_CACHE: &str = "__neon_cache"; thread_local! { // Symbol("__neon_cache") - static CACHE_SYMBOL: OnceCell> = OnceCell::default(); + static NEON_CACHE: OnceCell> = OnceCell::default(); } /// Reference counted JavaScript value with a static lifetime for use in async closures @@ -68,47 +65,25 @@ impl RootGlobal { } } -/* - globalThis = { - [key: Symbol("__neon_cache")]: Set - } -*/ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { - let global_this = cx.global_object(); - - let neon_cache_symbol = CACHE_SYMBOL.with({ + let neon_cache = NEON_CACHE.with({ |raw_value| { raw_value - .get_or_try_init(|| -> NeonResult> { - let set_ctor = global_this.get::(cx, "Map")?; + .get_or_try_init(|| -> NeonResult> { + let set_ctor = cx.global_object().get::(cx, "Map")?; let neon_cache = set_ctor.construct(cx, &[])?; - - let symbol = cx.symbol(KEY_NEON_CACHE); - let symbol = symbol.root(cx); - - { - let symbol = symbol.clone(cx); - let symbol = symbol.into_inner(cx); - global_this.set(cx, symbol, neon_cache)?; - } - - Ok(symbol) + Ok(neon_cache.root(cx)) }) .and_then(|e| Ok(e.clone(cx))) } })?; - let neon_cache_symbol = neon_cache_symbol.into_inner(cx); - - let Some(neon_cache) = global_this.get_opt::(cx, neon_cache_symbol)? else { - return Err(cx.throw_error("Unable to find cache")?); - }; - - Ok(neon_cache) + Ok(neon_cache.into_inner(cx)) } fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult { let neon_cache = get_cache(cx)?; + // Is this safe? let key = format!("{:?}", value.to_local()); get_cache(cx)? diff --git a/crates/neon/src/lifecycle.rs b/crates/neon/src/lifecycle.rs index 0a08ef922..6399c2295 100644 --- a/crates/neon/src/lifecycle.rs +++ b/crates/neon/src/lifecycle.rs @@ -20,7 +20,7 @@ use std::{ use crate::{ context::Context, event::Channel, - handle::root::NapiRef, + handle::root_object::NapiRef, sys::{lifecycle, raw::Env, tsfn::ThreadsafeFunction}, types::promise::NodeApiDeferred, }; From b4482775a900be392962ab1a4c2086d7152839d5 Mon Sep 17 00:00:00 2001 From: David Alsh Date: Tue, 25 Jun 2024 15:32:51 +1000 Subject: [PATCH 9/9] Ok all working again --- crates/neon/Cargo.toml | 2 +- crates/neon/src/async_local/mod.rs | 25 +++++--- crates/neon/src/async_local/runtime.rs | 5 +- crates/neon/src/async_local/waker.rs | 19 +++--- crates/neon/src/context/mod.rs | 83 +++++++++++++------------- crates/neon/src/handle/mod.rs | 7 ++- crates/neon/src/handle/root_value.rs | 42 +++++++------ crates/neon/src/prelude.rs | 4 +- 8 files changed, 104 insertions(+), 83 deletions(-) diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index 493498714..2d95bc26d 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -44,7 +44,7 @@ features = ["sync"] optional = true [features] -default = ["napi-8", "futures"] +default = ["napi-8"] # Enable extracting values by serializing to JSON serde = ["dep:serde", "dep:serde_json"] diff --git a/crates/neon/src/async_local/mod.rs b/crates/neon/src/async_local/mod.rs index 17f7a0ed2..81f4ef21b 100644 --- a/crates/neon/src/async_local/mod.rs +++ b/crates/neon/src/async_local/mod.rs @@ -17,20 +17,31 @@ use crate::sys; pub fn spawn_async_local<'a>( cx: &mut impl Context<'a>, future: impl Future + 'static, -) -> Result<(), ()> { +) { // Add a future to the future pool to be executed // whenever the Nodejs event loop is free to do so - LocalRuntime::queue_future(future).unwrap(); + LocalRuntime::queue_future(future); // If there are tasks in flight then the executor // is already running and should be reused if LocalRuntime::futures_count() > 1 { - return Ok(()); + return; } - // The futures executor runs on another thread and will - // use a threadsafe function to call schedule work - // on the JavaScript thread + // The futures executor runs on the main thread thread but + // the waker runs on another thread. + // + // The main thread executor will run the contained futures + // and as soon as they stall (e.g. waiting for a channel, timer, etc), + // the executor will immediately yield back to the JavaScript event loop. + // + // This "parks" the executer, which normally means the thread + // is block - however we cannot do that here so instead, there + // is a sacrificial "waker" thread who's only job is to sleep/wake and + // signal to Nodejs that futures need to be run. + // + // The waker thread notifies the main thread of pending work by + // running the futures executor within a threadsafe function let env_raw = cx.env().to_raw(); LocalWaker::send(WakerEvent::Init(unsafe { @@ -44,6 +55,4 @@ pub fn spawn_async_local<'a>( } }) })); - - Ok(()) } diff --git a/crates/neon/src/async_local/runtime.rs b/crates/neon/src/async_local/runtime.rs index f1a49d22d..764ea77ac 100644 --- a/crates/neon/src/async_local/runtime.rs +++ b/crates/neon/src/async_local/runtime.rs @@ -5,7 +5,6 @@ use super::executor::LocalPool; use super::executor::LocalSpawner; use super::executor::ThreadNotifyRef; use futures::task::LocalSpawnExt; -use futures::task::SpawnError; use once_cell::unsync::Lazy; thread_local! { @@ -21,12 +20,12 @@ impl LocalRuntime { Self::count() } - pub fn queue_future(future: impl Future + 'static) -> Result<(), SpawnError> { + pub fn queue_future(future: impl Future + 'static) { Self::increment(); SPAWNER.with(move |ls| ls.spawn_local(async move { future.await; Self::decrement(); - })) + })).expect("Unable to spawn future on local pool"); } pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool { diff --git a/crates/neon/src/async_local/waker.rs b/crates/neon/src/async_local/waker.rs index 70198294f..b8ea50f81 100644 --- a/crates/neon/src/async_local/waker.rs +++ b/crates/neon/src/async_local/waker.rs @@ -22,18 +22,20 @@ pub enum WakerEvent { } /// The futures waker that coordinates with the futures executor to notify -/// the main thread to pause and resume execution of futures. +/// the main thread to resume execution of futures. /// /// The waker is implemented as a dedicated system thread which is parked -/// by the local futures executor while waiting for futures to resume work. +/// by the local futures executor. Futures (like channel, timers) will +/// call the wake() method Futures Waker trait. /// -/// Once woken up, the waker resumes execution of futures on the JavaScript -/// thread by triggering a napi threadsafe function to poll the futures in -/// the local pool until no more progress can be made before yielding back -/// to the Nodejs event loop. +/// This gives it some level of portability - for instance any utilities +/// from the "async_std" crate will work however most things from Tokio +/// won't work. /// -/// This allows for the execution of Rust futures to integrate with the -/// Nodejs event loop without blocking either +/// Once woken up, the waker resumes execution of futures on the JavaScript +/// thread by triggering a napi threadsafe function which executes a callback +/// that runs on the main JavaScript thread. This callback is used to poll +/// the futures in the local pool. pub struct LocalWaker; impl LocalWaker { @@ -46,7 +48,6 @@ impl LocalWaker { fn start_waker_thread() -> Sender { let (tx, rx) = channel(); - // Dedicated waker thread to use for waiting on pending futures thread::spawn(move || { let thread_notify = ThreadNotify::new(); let mut handle = None::; diff --git a/crates/neon/src/context/mod.rs b/crates/neon/src/context/mod.rs index fcc3d9fea..1941941f9 100644 --- a/crates/neon/src/context/mod.rs +++ b/crates/neon/src/context/mod.rs @@ -145,10 +145,10 @@ use std::{convert::Into, marker::PhantomData, panic::UnwindSafe}; pub use crate::types::buffer::lock::Lock; -// #[cfg(feature = "async_local")] -// use crate::async_local::{root::RootGlobal, spawn_async_local}; #[cfg(feature = "async_local")] use futures::Future; +#[cfg(feature = "async_local")] +use crate::handle::StaticHandle; use crate::{ event::TaskBuilder, @@ -290,22 +290,22 @@ pub trait Context<'a>: ContextInternal<'a> { result } + /// Execute a future on the local JavaScript thread. This does not block JavaScript execution. + /// + /// Note: Avoid doing heavy computation on the main thread. The intended use case for this is + /// waiting on channel receivers for data coming from other threads, waiting on timers and + /// handling async behaviors from JavaScript. #[cfg(feature = "async_local")] - fn execute_async_local(&mut self, f: F) + fn spawn_local(&mut self, f: F) where Fut: Future, F: FnOnce(AsyncContext) -> Fut + 'static, { - use futures::Future; - let env = self.env(); - crate::async_local::spawn_async_local(self, async move { - // let scope = unsafe { HandleScope::new(env.to_raw()) }; let future = f(AsyncContext { env }); future.await; - // drop(scope); - }).unwrap(); + }); } /// Executes a computation in a new memory management scope and computes a single result value that outlives the computation. @@ -623,29 +623,31 @@ impl<'a> ModuleContext<'a> { F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy, V: Value, { - // let wrapper = JsFunction::new(self, move |mut cx| { - // let mut args = vec![]; - - // while let Some(arg) = cx.argument_opt(args.len()) { - // let arg = arg.as_value(&mut cx); - // let arg = RootGlobal::new(&mut cx, arg); - // args.push(arg); - // } - - // let (deferred, promise) = cx.promise(); - // cx.execute_async_local(move |mut cx| async move { - // let acx = AsyncFunctionContext { - // env: cx.env(), - // arguments: args, - // }; - // deferred.resolve(&mut cx, f(acx).await.unwrap()); - // () - // }); - - // Ok(promise) - // })?; - - // self.exports.clone().set(self, key, wrapper)?; + use crate::handle::StaticHandle; + + let wrapper = JsFunction::new(self, move |mut cx| { + let mut args = vec![]; + + while let Some(arg) = cx.argument_opt(args.len()) { + let arg = arg.as_value(&mut cx); + let arg = StaticHandle::new(&mut cx, arg)?; + args.push(arg); + } + + let (deferred, promise) = cx.promise(); + cx.spawn_local(move |mut cx| async move { + let acx = AsyncFunctionContext { + env: cx.env(), + arguments: args, + }; + deferred.resolve(&mut cx, f(acx).await.unwrap()); + () + }); + + Ok(promise) + })?; + + self.exports.clone().set(self, key, wrapper)?; Ok(()) } @@ -851,16 +853,17 @@ impl<'a> Context<'a> for FunctionContext<'a> {} #[cfg(feature = "async_local")] pub struct AsyncFunctionContext { env: Env, - // arguments: Vec, + arguments: Vec>, } #[cfg(feature = "async_local")] impl<'a> AsyncFunctionContext { pub fn argument(&mut self, i: usize) -> JsResult<'a, V> { - // let arg = self.arguments.get(i).unwrap().clone(); - // let handle = arg.into_inner(self); - // Ok(handle) - todo!() + let arg = self.arguments.get(i).unwrap().clone(); + let arg = arg.from_static(self)?; + let value = unsafe { V::from_local(self.env(), arg.to_local()) }; + let handle = Handle::new_internal(value); + Ok(handle) } } @@ -877,9 +880,9 @@ impl<'a> Context<'a> for AsyncFunctionContext {} #[cfg(feature = "async_local")] impl Drop for AsyncFunctionContext { fn drop(&mut self) { - // while let Some(arg) = self.arguments.pop() { - // arg.remove(self); - // } + while let Some(arg) = self.arguments.pop() { + arg.drop(self).unwrap(); + } } } diff --git a/crates/neon/src/handle/mod.rs b/crates/neon/src/handle/mod.rs index f760d7129..7974257e2 100644 --- a/crates/neon/src/handle/mod.rs +++ b/crates/neon/src/handle/mod.rs @@ -96,8 +96,11 @@ impl<'a, V: Value + 'a> Handle<'a, V> { } } - pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult> { - RootGlobal::new(cx, self) + /// Detaches the value from the Nodejs garbage collector + /// and manages the variable lifetime via reference counting. + /// Useful when interacting with a value within async closures + pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult> { + StaticHandle::new(cx, self) } } diff --git a/crates/neon/src/handle/root_value.rs b/crates/neon/src/handle/root_value.rs index 83aabb533..ad838c4d2 100644 --- a/crates/neon/src/handle/root_value.rs +++ b/crates/neon/src/handle/root_value.rs @@ -12,24 +12,28 @@ use crate::result::JsResult; use crate::result::NeonResult; use crate::types::JsFunction; use crate::types::JsObject; +use crate::types::JsSymbol; + +// This creates a rooted object and stores javascript +// values on it as a way to grant any JavaScript value +// a static lifetime thread_local! { - // Symbol("__neon_cache") static NEON_CACHE: OnceCell> = OnceCell::default(); } /// Reference counted JavaScript value with a static lifetime for use in async closures -pub struct RootGlobal { +pub struct StaticHandle { pub(crate) count: Rc>, - pub(crate) inner: Rc, + pub(crate) inner: Rc>, _p: PhantomData, } -impl RootGlobal { +impl StaticHandle { pub(crate) fn new<'a>( cx: &mut impl Context<'a>, value: Handle<'a, T>, - ) -> NeonResult> { + ) -> NeonResult> { Ok(Self { count: Rc::new(RefCell::new(1)), inner: Rc::new(set_ref(cx, value)?), @@ -37,7 +41,7 @@ impl RootGlobal { }) } - pub fn clone<'a>(&self) -> RootGlobal { + pub fn clone(&self) -> StaticHandle { let mut count = self.count.borrow_mut(); *count += 1; drop(count); @@ -49,8 +53,8 @@ impl RootGlobal { } } - pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { - get_ref(cx, &*self.inner) + pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> { + get_ref(cx, &self.inner) } pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> { @@ -58,7 +62,7 @@ impl RootGlobal { *count -= 1; if *count == 0 { - delete_ref(cx, &*self.inner)? + delete_ref(cx, &self.inner)? } Ok(()) @@ -81,41 +85,43 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> { Ok(neon_cache.into_inner(cx)) } -fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult { +fn set_ref<'a, V: Value>( + cx: &mut impl Context<'a>, + value: Handle<'a, V>, +) -> NeonResult> { let neon_cache = get_cache(cx)?; - // Is this safe? - let key = format!("{:?}", value.to_local()); + let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx); get_cache(cx)? .get::(cx, "set")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(symbol.clone(cx).into_inner(cx)) .arg(value) .exec(cx)?; - Ok(key) + Ok(symbol) } -fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> { +fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root) -> JsResult<'a, V> { let neon_cache = get_cache(cx)?; get_cache(cx)? .get::(cx, "get")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(key.clone(cx).into_inner(cx)) .apply(cx) } -fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> { +fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root) -> NeonResult<()> { let neon_cache = get_cache(cx)?; get_cache(cx)? .get::(cx, "delete")? .call_with(cx) .this(neon_cache) - .arg(cx.string(&key)) + .arg(key.clone(cx).into_inner(cx)) .exec(cx)?; Ok(()) diff --git a/crates/neon/src/prelude.rs b/crates/neon/src/prelude.rs index 01b87f55e..837dc5ca1 100644 --- a/crates/neon/src/prelude.rs +++ b/crates/neon/src/prelude.rs @@ -3,8 +3,8 @@ #[doc(no_inline)] pub use crate::{ context::{ - CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext, - TaskContext, + AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext, + FunctionContext, ModuleContext, TaskContext, }, handle::{Handle, Root}, object::Object,