Skip to content

Commit

Permalink
feat(neon): Implement Futures feature
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvalencik committed Mar 9, 2022
1 parent e2c8e58 commit 2b19d8c
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ check-legacy = "check --all-targets --no-default-features -p neon -p neon-runtim
clippy-legacy = "clippy --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p tests -p static_tests --features event-handler-api,proc-macros,legacy-runtime -- -A clippy::missing_safety_doc"
clippy-napi = "clippy --all-targets --no-default-features -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --features napi-experimental -- -A clippy::missing_safety_doc"
neon-test = "test -p neon -p neon-runtime -p neon-build -p neon-macros -p electron-tests -p napi-tests --no-default-features --features=napi-experimental"
neon-doc = "rustdoc --no-default-features --features=napi-experimental -- --cfg docsrs"
neon-doc = "rustdoc --no-default-features --features=futures,napi-experimental -- --cfg docsrs"
neon-doc-test = "test --doc --no-default-features --features=napi-experimental"
13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ smallvec = "1.4.2"
neon-runtime = { version = "=0.10.0", path = "crates/neon-runtime" }
neon-macros = { version = "=0.10.0", path = "crates/neon-macros", optional = true }

[dependencies.futures-channel]
version = "0.3"
default-features = false
features = ["alloc"]
optional = true

[features]
default = ["legacy-runtime"]

Expand Down Expand Up @@ -72,12 +78,16 @@ docs-only = ["neon-runtime/docs-only"]
# DEPRECATED: Will be removed with `legacy-runtime` since it is enabled by default in Node-API backend.
proc-macros = ["neon-macros"]

# Experimental Rust Futures API
# https://github.com/neon-bindings/rfcs/pull/46
futures = ["futures-channel"]

[package.metadata.docs.rs]
no-default-features = true
rustdoc-args = ["--cfg", "docsrs"]
features = [
"futures",
"napi-experimental",
"proc-macros",
]

[workspace]
Expand All @@ -89,5 +99,6 @@ members = [
"test/static",
"test/electron",
"test/dynamic/native",
"test/futures",
"test/napi"
]
20 changes: 20 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 82 additions & 10 deletions src/event/channel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::sync::Arc;

use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;

use crate::context::{Context, TaskContext};
use crate::result::NeonResult;
use crate::result::{NeonResult, ResultExt};

#[cfg(feature = "futures")]
use {
futures_channel::oneshot,
std::future::Future,
std::pin::Pin,
std::task::{self, Poll},
};

#[cfg(not(feature = "futures"))]
// Synchronous oneshot channel API compatible with `futures-channel`
mod oneshot {
use std::sync::mpsc;

pub(super) use std::sync::mpsc::Receiver;

pub(super) fn channel<T>() -> (mpsc::SyncSender<T>, mpsc::Receiver<T>) {
mpsc::sync_channel(1)
}
}

type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

Expand Down Expand Up @@ -121,7 +141,7 @@ impl Channel {
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
let (tx, rx) = oneshot::channel();
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

Expand Down Expand Up @@ -215,16 +235,56 @@ impl Drop for Channel {
/// thread with [`Channel::send`].
pub struct JoinHandle<T> {
// `Err` is always `Throw`, but `Throw` cannot be sent across threads
rx: mpsc::Receiver<Result<T, ()>>,
rx: oneshot::Receiver<Result<T, ()>>,
}

impl<T> JoinHandle<T> {
/// Waits for the associated closure to finish executing
///
/// If the closure panics or throws an exception, `Err` is returned
///
/// **Warning**: This should not be called from the JavaScript main thread.
/// If it is called from the JavaScript main thread, it will _deadlock_.
#[cfg(any(not(feature = "futures"), docsrs))]
#[cfg_attr(docsrs, doc(cfg(not(feature = "futures"))))]
pub fn join(self) -> Result<T, JoinError> {
self.rx
.recv()
#[cfg(feature = "futures")]
{
unimplemented!("`JoinHandle::join` is not implemented with the `futures` feature")
}

#[cfg(not(feature = "futures"))]
JoinError::map_res(self.rx.recv())
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
JoinError::map_poll(&mut self.rx, cx)
}
}

impl JoinError {
#[cfg(feature = "futures")]
// Helper for writing a `Future` implementation by wrapping a `Future` and
// mapping to `Result<T, JoinError>`
pub(crate) fn map_poll<T, E>(
f: &mut (impl Future<Output = Result<Result<T, ()>, E>> + Unpin),
cx: &mut task::Context,
) -> Poll<Result<T, Self>> {
match Pin::new(f).poll(cx) {
Poll::Ready(result) => Poll::Ready(Self::map_res(result)),
Poll::Pending => Poll::Pending,
}
}

// Helper for mapping a nested `Result` from joining to a `Result<T, JoinError>`
pub(crate) fn map_res<T, E>(res: Result<Result<T, ()>, E>) -> Result<T, Self> {
res
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
Expand All @@ -237,6 +297,15 @@ impl<T> JoinHandle<T> {
/// or threw an exception.
pub struct JoinError(JoinErrorType);

impl JoinError {
fn as_str(&self) -> &str {
match &self.0 {
JoinErrorType::Panic => "Closure panicked before returning",
JoinErrorType::Throw => "Closure threw an exception",
}
}
}

#[derive(Debug)]
enum JoinErrorType {
Panic,
Expand All @@ -245,15 +314,18 @@ enum JoinErrorType {

impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
}
f.write_str(self.as_str())
}
}

impl std::error::Error for JoinError {}

impl<T> ResultExt<T> for Result<T, JoinError> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
self.or_else(|err| cx.throw_error(err.as_str()))
}
}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
///
/// The most likely cause of a failure is that Node is shutting down. This may occur if the
Expand Down
10 changes: 10 additions & 0 deletions src/result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,13 @@ pub trait JsResultExt<'a, V: Value> {
pub trait ResultExt<T> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T>;
}

impl<'a, 'b, T, E> ResultExt<Handle<'a, T>> for Result<Handle<'a, T>, Handle<'b, E>>
where
T: Value,
E: Value,
{
fn or_throw<'cx, C: Context<'cx>>(self, cx: &mut C) -> JsResult<'a, T> {
self.or_else(|err| cx.throw(err))
}
}
3 changes: 3 additions & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub use self::buffer::types::{JsArrayBuffer, JsBuffer, JsTypedArray};
#[cfg(feature = "napi-5")]
pub use self::date::{DateError, DateErrorKind, JsDate};
pub use self::error::JsError;
#[cfg(all(feature = "napi-5", feature = "futures"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))]
pub use self::promise::JsFuture;
#[cfg(feature = "napi-1")]
pub use self::promise::{Deferred, JsPromise};

Expand Down
Loading

0 comments on commit 2b19d8c

Please sign in to comment.