diff --git a/.cargo/config.toml b/.cargo/config.toml index 1e621e083..4f56ff8cf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,7 +1,7 @@ [alias] # Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features` # The following aliases simplify linting the entire workspace -neon-check = " check --all --all-targets --features napi-experimental" -neon-clippy = "clippy --all --all-targets --features napi-experimental -- -A clippy::missing_safety_doc" -neon-test = " test --all --features=napi-experimental" -neon-doc = " rustdoc -p neon --features=napi-experimental -- --cfg docsrs" +neon-check = " check --all --all-targets --features napi-experimental,futures" +neon-clippy = "clippy --all --all-targets --features napi-experimental,futures -- -A clippy::missing_safety_doc" +neon-test = " test --all --features=napi-experimental,futures" +neon-doc = " rustdoc -p neon --features=napi-experimental,futures -- --cfg docsrs" diff --git a/crates/neon/Cargo.toml b/crates/neon/Cargo.toml index dd9077321..ec1f88bfa 100644 --- a/crates/neon/Cargo.toml +++ b/crates/neon/Cargo.toml @@ -25,9 +25,19 @@ semver = "0.9.0" smallvec = "1.4.2" neon-macros = { version = "=0.10.1", path = "../neon-macros" } +[dependencies.tokio] +version = "1.18.2" +default-features = false +features = ["sync"] +optional = true + [features] default = ["napi-1"] +# Experimental Rust Futures API +# https://github.com/neon-bindings/rfcs/pull/46 +futures = ["tokio"] + # Default N-API version. Prefer to select a minimum required version. # DEPRECATED: This is an alias that should be removed napi-runtime = ["napi-8"] @@ -62,5 +72,6 @@ proc-macros = [] [package.metadata.docs.rs] rustdoc-args = ["--cfg", "docsrs"] features = [ + "futures", "napi-experimental", ] diff --git a/crates/neon/src/event/channel.rs b/crates/neon/src/event/channel.rs index 8e44de8a5..801d90873 100644 --- a/crates/neon/src/event/channel.rs +++ b/crates/neon/src/event/channel.rs @@ -1,14 +1,38 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Arc, +use std::{ + error, fmt, mem, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use crate::{ context::{Context, TaskContext}, result::NeonResult, + result::ResultExt, sys::{raw::Env, tsfn::ThreadsafeFunction}, }; +#[cfg(feature = "futures")] +use { + std::future::Future, + std::pin::Pin, + std::task::{self, Poll}, + tokio::sync::oneshot, +}; + +#[cfg(not(feature = "futures"))] +// Synchronous oneshot channel API compatible with `futures-channel` +mod oneshot { + use std::sync::mpsc; + + pub(super) use std::sync::mpsc::Receiver; + + pub(super) fn channel() -> (mpsc::SyncSender, mpsc::Receiver) { + mpsc::sync_channel(1) + } +} + type Callback = Box; /// Channel for scheduling Rust closures to execute on the JavaScript main thread. @@ -70,8 +94,8 @@ pub struct Channel { has_ref: bool, } -impl std::fmt::Debug for Channel { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for Channel { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("Channel") } } @@ -131,9 +155,9 @@ impl Channel { T: Send + 'static, F: FnOnce(TaskContext) -> NeonResult + Send + 'static, { - let (tx, rx) = mpsc::sync_channel(1); + let (tx, rx) = oneshot::channel(); let callback = Box::new(move |env| { - let env = unsafe { std::mem::transmute(env) }; + let env = unsafe { mem::transmute(env) }; // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because // N-API creates a `HandleScope` before calling the callback. @@ -225,20 +249,34 @@ impl Drop for Channel { /// thread with [`Channel::send`]. pub struct JoinHandle { // `Err` is always `Throw`, but `Throw` cannot be sent across threads - rx: mpsc::Receiver>, + rx: oneshot::Receiver>, } impl JoinHandle { /// Waits for the associated closure to finish executing /// /// If the closure panics or throws an exception, `Err` is returned + /// + /// # Panics + /// + /// This function panics if called within an asynchronous execution context. pub fn join(self) -> Result { - self.rx - .recv() - // If the sending side dropped without sending, it must have panicked - .map_err(|_| JoinError(JoinErrorType::Panic))? - // If the closure returned `Err`, a JavaScript exception was thrown - .map_err(|_| JoinError(JoinErrorType::Throw)) + #[cfg(feature = "futures")] + let result = self.rx.blocking_recv(); + #[cfg(not(feature = "futures"))] + let result = self.rx.recv(); + + JoinError::map_res(result) + } +} + +#[cfg(feature = "futures")] +#[cfg_attr(docsrs, doc(cfg(feature = "futures")))] +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { + JoinError::map_poll(&mut self.rx, cx) } } @@ -253,16 +291,50 @@ enum JoinErrorType { Throw, } -impl std::fmt::Display for JoinError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl JoinError { + fn as_str(&self) -> &str { match &self.0 { - JoinErrorType::Panic => f.write_str("Closure panicked before returning"), - JoinErrorType::Throw => f.write_str("Closure threw an exception"), + JoinErrorType::Panic => "Closure panicked before returning", + JoinErrorType::Throw => "Closure threw an exception", + } + } + + #[cfg(feature = "futures")] + // Helper for writing a `Future` implementation by wrapping a `Future` and + // mapping to `Result` + pub(crate) fn map_poll( + f: &mut (impl Future, E>> + Unpin), + cx: &mut task::Context, + ) -> Poll> { + match Pin::new(f).poll(cx) { + Poll::Ready(result) => Poll::Ready(Self::map_res(result)), + Poll::Pending => Poll::Pending, } } + + // Helper for mapping a nested `Result` from joining to a `Result` + pub(crate) fn map_res(res: Result, E>) -> Result { + res + // If the sending side dropped without sending, it must have panicked + .map_err(|_| JoinError(JoinErrorType::Panic))? + // If the closure returned `Err`, a JavaScript exception was thrown + .map_err(|_| JoinError(JoinErrorType::Throw)) + } +} + +impl fmt::Display for JoinError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.as_str()) + } } -impl std::error::Error for JoinError {} +impl error::Error for JoinError {} + +impl ResultExt for Result { + fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult { + self.or_else(|err| cx.throw_error(err.as_str())) + } +} /// Error indicating that a closure was unable to be scheduled to execute on the event loop. /// @@ -275,19 +347,19 @@ impl std::error::Error for JoinError {} #[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))] pub struct SendError; -impl std::fmt::Display for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "SendError") } } -impl std::fmt::Debug for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(self, f) +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(self, f) } } -impl std::error::Error for SendError {} +impl error::Error for SendError {} struct ChannelState { tsfn: ThreadsafeFunction, diff --git a/crates/neon/src/result/mod.rs b/crates/neon/src/result/mod.rs index 36dceb967..9a2c38ddc 100644 --- a/crates/neon/src/result/mod.rs +++ b/crates/neon/src/result/mod.rs @@ -80,3 +80,13 @@ pub trait JsResultExt<'a, V: Value> { pub trait ResultExt { fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult; } + +impl<'a, 'b, T, E> ResultExt> for Result, Handle<'b, E>> +where + T: Value, + E: Value, +{ + fn or_throw<'cx, C: Context<'cx>>(self, cx: &mut C) -> JsResult<'a, T> { + self.or_else(|err| cx.throw(err)) + } +} diff --git a/crates/neon/src/sys/async_work.rs b/crates/neon/src/sys/async_work.rs index e97b46d48..717b16ab3 100644 --- a/crates/neon/src/sys/async_work.rs +++ b/crates/neon/src/sys/async_work.rs @@ -5,7 +5,7 @@ //! a more idiomatic Rust ownership pattern by passing the output of `execute` //! into the input of `complete`. //! -//! https://nodejs.org/api/n-api.html#n_api_simple_asynchronous_operations +//! use std::{ ffi::c_void, diff --git a/crates/neon/src/sys/promise.rs b/crates/neon/src/sys/promise.rs index d9e30cd09..f14a48366 100644 --- a/crates/neon/src/sys/promise.rs +++ b/crates/neon/src/sys/promise.rs @@ -1,6 +1,6 @@ //! JavaScript Promise and Deferred handle //! -//! https://nodejs.org/api/n-api.html#n_api_promises +//! use std::mem::MaybeUninit; diff --git a/crates/neon/src/types/mod.rs b/crates/neon/src/types/mod.rs index f6ee3188a..24e20050a 100644 --- a/crates/neon/src/types/mod.rs +++ b/crates/neon/src/types/mod.rs @@ -116,6 +116,10 @@ pub use self::{ #[cfg(feature = "napi-5")] pub use self::date::{DateError, DateErrorKind, JsDate}; +#[cfg(all(feature = "napi-5", feature = "futures"))] +#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))] +pub use self::promise::JsFuture; + pub(crate) fn build<'a, T: Managed, F: FnOnce(&mut raw::Local) -> bool>( env: Env, init: F, diff --git a/crates/neon/src/types/promise.rs b/crates/neon/src/types/promise.rs index 880dbceb4..822e46c85 100644 --- a/crates/neon/src/types/promise.rs +++ b/crates/neon/src/types/promise.rs @@ -23,6 +23,19 @@ use { std::sync::Arc, }; +#[cfg(all(feature = "napi-5", feature = "futures"))] +use { + crate::context::internal::ContextInternal, + crate::event::JoinError, + crate::result::NeonResult, + crate::types::{JsFunction, JsValue}, + std::future::Future, + std::pin::Pin, + std::sync::Mutex, + std::task::{self, Poll}, + tokio::sync::oneshot, +}; + const BOUNDARY: FailureBoundary = FailureBoundary { both: "A panic and exception occurred while resolving a `neon::types::Deferred`", exception: "An exception occurred while resolving a `neon::types::Deferred`", @@ -51,6 +64,97 @@ impl JsPromise { (deferred, Handle::new_internal(JsPromise(promise))) } + + /// Creates a new `Promise` immediately resolved with the given value. If the value is a + /// `Promise` or a then-able, it will be flattened. + /// + /// `JsPromise::resolve` is useful to ensure a value that might not be a `Promise` or + /// might not be a native promise is converted to a `Promise` before use. + pub fn resolve<'a, C: Context<'a>, T: Value>(cx: &mut C, value: Handle) -> Handle<'a, Self> { + let (deferred, promise) = cx.promise(); + deferred.resolve(cx, value); + promise + } + + /// Creates a nwe `Promise` immediately rejected with the given error. + pub fn reject<'a, C: Context<'a>, E: Value>(cx: &mut C, err: Handle) -> Handle<'a, Self> { + let (deferred, promise) = cx.promise(); + deferred.reject(cx, err); + promise + } + + #[cfg(all(feature = "napi-5", feature = "futures"))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))] + /// Creates a [`Future`](std::future::Future) that can be awaited to receive the result of a + /// JavaScript `Promise`. + /// + /// A callback must be provided that maps a `Result` representing the resolution or rejection of + /// the `Promise` and returns a value as the `Future` output. + /// + /// _Note_: Unlike `Future`, `Promise` are eagerly evaluated and so are `JsFuture`. + pub fn to_future<'a, O, C, F>(&self, cx: &mut C, f: F) -> NeonResult> + where + O: Send + 'static, + C: Context<'a>, + F: FnOnce(TaskContext, Result, Handle>) -> NeonResult + + Send + + 'static, + { + let then = self.get::(cx, "then")?; + let catch = self.get::(cx, "catch")?; + + let (tx, rx) = oneshot::channel(); + let take_state = { + // Note: If this becomes a bottleneck, `unsafe` could be used to avoid it. + // The promise spec guarantees that it will only be used once. + let state = Arc::new(Mutex::new(Some((f, tx)))); + + move || { + state + .lock() + .ok() + .and_then(|mut lock| lock.take()) + // This should never happen because `self` is a native `Promise` + // and settling multiple times is a violation of the spec. + .expect("Attempted to settle JsFuture multiple times") + } + }; + + let resolve = JsFunction::new(cx, { + let take_state = take_state.clone(); + + move |mut cx| { + let (f, tx) = take_state(); + let v = cx.argument::(0)?; + + TaskContext::with_context(cx.env(), move |cx| { + // Error indicates that the `Future` has already dropped; ignore + let _ = tx.send(f(cx, Ok(v)).map_err(|_| ())); + }); + + Ok(cx.undefined()) + } + })?; + + let reject = JsFunction::new(cx, { + move |mut cx| { + let (f, tx) = take_state(); + let v = cx.argument::(0)?; + + TaskContext::with_context(cx.env(), move |cx| { + // Error indicates that the `Future` has already dropped; ignore + let _ = tx.send(f(cx, Err(v)).map_err(|_| ())); + }); + + Ok(cx.undefined()) + } + })?; + + then.exec(cx, Handle::new_internal(Self(self.0)), [resolve.upcast()])?; + catch.exec(cx, Handle::new_internal(Self(self.0)), [reject.upcast()])?; + + Ok(JsFuture { rx }) + } } unsafe impl TransparentNoCopyWrapper for JsPromise { @@ -240,3 +344,24 @@ impl Drop for Deferred { } } } + +#[cfg(all(feature = "napi-5", feature = "futures"))] +#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))] +/// A [`Future`](std::future::Future) created from a [`JsPromise`]. +/// +/// Unlike typical `Future`, `JsFuture` are eagerly executed because they +/// are backed by a `Promise`. +pub struct JsFuture { + // `Err` is always `Throw`, but `Throw` cannot be sent across threads + rx: oneshot::Receiver>, +} + +#[cfg(all(feature = "napi-5", feature = "futures"))] +#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))] +impl Future for JsFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll { + JoinError::map_poll(&mut self.rx, cx) + } +} diff --git a/test/napi/Cargo.toml b/test/napi/Cargo.toml index 0922d48e1..29ac53e9a 100644 --- a/test/napi/Cargo.toml +++ b/test/napi/Cargo.toml @@ -11,8 +11,9 @@ crate-type = ["cdylib"] [dependencies] once_cell = "1" +tokio = { version = "1", features = ["rt-multi-thread"] } [dependencies.neon] version = "*" path = "../../crates/neon" -features = ["napi-8"] +features = ["futures", "napi-experimental"] diff --git a/test/napi/lib/futures.js b/test/napi/lib/futures.js new file mode 100644 index 000000000..ae38df5c1 --- /dev/null +++ b/test/napi/lib/futures.js @@ -0,0 +1,58 @@ +const assert = require("assert"); + +const addon = require(".."); + +async function assertRejects(f, ...args) { + try { + await f(); + } catch (err) { + assert.throws(() => { + throw err; + }, ...args); + + return; + } + + assert.throws(() => {}, ...args); +} + +describe("Futures", () => { + describe("Channel", () => { + it("should be able to await channel result", async () => { + const sum = await addon.lazy_async_add( + () => 1, + () => 2 + ); + + assert.strictEqual(sum, 3); + }); + + it("exceptions should be handled", async () => { + await assertRejects(async () => { + await addon.lazy_async_add( + () => 1, + () => { + throw new Error("Failed to get Y"); + } + ); + }, /exception/i); + }); + }); + + describe("JsFuture", () => { + it("should be able to convert a promise to a future", async () => { + const nums = new Float64Array([1, 2, 3, 4]); + const sum = await addon.lazy_async_sum(async () => nums); + + assert.strictEqual(sum, 10); + }); + + it("should catch promise rejection", async () => { + await assertRejects(async () => { + await addon.lazy_async_sum(async () => { + throw new Error("Oh, no!"); + }); + }, /exception/i); + }); + }); +}); diff --git a/test/napi/src/js/futures.rs b/test/napi/src/js/futures.rs new file mode 100644 index 000000000..f7218f9bb --- /dev/null +++ b/test/napi/src/js/futures.rs @@ -0,0 +1,81 @@ +use { + neon::{prelude::*, types::buffer::TypedArray}, + once_cell::sync::OnceCell, + tokio::runtime::Runtime, +}; + +fn runtime<'a, C: Context<'a>>(cx: &mut C) -> NeonResult<&'static Runtime> { + static RUNTIME: OnceCell = OnceCell::new(); + + RUNTIME + .get_or_try_init(|| Runtime::new()) + .or_else(|err| cx.throw_error(&err.to_string())) +} + +// Accepts two functions that take no parameters and return numbers. +// Resolves with the sum of the two numbers. +// Purpose: Test the `Future` implementation on `JoinHandle` +pub fn lazy_async_add(mut cx: FunctionContext) -> JsResult { + let get_x = cx.argument::(0)?.root(&mut cx); + let get_y = cx.argument::(1)?.root(&mut cx); + let channel = cx.channel(); + let runtime = runtime(&mut cx)?; + let (deferred, promise) = cx.promise(); + + runtime.spawn(async move { + let result = channel + .send(move |mut cx| { + let get_x = get_x.into_inner(&mut cx); + let get_y = get_y.into_inner(&mut cx); + + let x: Handle = get_x.call_with(&cx).apply(&mut cx)?; + let y: Handle = get_y.call_with(&cx).apply(&mut cx)?; + + Ok((x.value(&mut cx), y.value(&mut cx))) + }) + .await + .map(|(x, y)| x + y); + + let _ = deferred.settle_with(&channel, move |mut cx| { + let result = result.or_throw(&mut cx)?; + + Ok(cx.number(result)) + }); + }); + + Ok(promise) +} + +// Accepts a function that returns a `Promise`. +// Resolves with the sum of all numbers. +// Purpose: Test `JsPromise::to_future`. +pub fn lazy_async_sum(mut cx: FunctionContext) -> JsResult { + let nums = cx + .argument::(0)? + .call_with(&cx) + .apply::(&mut cx)? + .to_future(&mut cx, |mut cx, nums| { + let nums = nums + .or_throw(&mut cx)? + .downcast_or_throw::, _>(&mut cx)? + .as_slice(&mut cx) + .to_vec(); + + Ok(nums) + })?; + + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let runtime = runtime(&mut cx)?; + + runtime.spawn(async move { + let result = nums.await.map(|nums| nums.into_iter().sum::()); + let _ = deferred.settle_with(&channel, move |mut cx| { + let result = result.or_throw(&mut cx)?; + + Ok(cx.number(result)) + }); + }); + + Ok(promise) +} diff --git a/test/napi/src/js/objects.rs b/test/napi/src/js/objects.rs index 2a703bf4b..f1c9409c5 100644 --- a/test/napi/src/js/objects.rs +++ b/test/napi/src/js/objects.rs @@ -1,6 +1,9 @@ use std::borrow::Cow; -use neon::{prelude::*, types::buffer::TypedArray}; +use neon::{ + prelude::*, + types::buffer::{BorrowError, TypedArray}, +}; pub fn return_js_global_object(mut cx: FunctionContext) -> JsResult { Ok(cx.global()) @@ -125,7 +128,7 @@ pub fn read_u8_typed_array(mut cx: FunctionContext) -> JsResult { pub fn copy_typed_array(mut cx: FunctionContext) -> JsResult { let source = cx.argument::>(0)?; let mut dest = cx.argument::>(1)?; - let mut run = || { + let mut run = || -> Result<_, BorrowError> { let lock = cx.lock(); let source = source.try_borrow(&lock)?; let mut dest = dest.try_borrow_mut(&lock)?; diff --git a/test/napi/src/lib.rs b/test/napi/src/lib.rs index f26a45a58..3b8d093bd 100644 --- a/test/napi/src/lib.rs +++ b/test/napi/src/lib.rs @@ -12,6 +12,7 @@ mod js { pub mod date; pub mod errors; pub mod functions; + pub mod futures; pub mod numbers; pub mod objects; pub mod strings; @@ -341,5 +342,9 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("get_or_init", js::workers::get_or_init)?; cx.export_function("get_or_init_clone", js::workers::get_or_init_clone)?; + // Futures + cx.export_function("lazy_async_add", js::futures::lazy_async_add)?; + cx.export_function("lazy_async_sum", js::futures::lazy_async_sum)?; + Ok(()) }