Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WASM support for client #388

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ readme = "../README.md"
description = "Proc macros for tarpc."

[features]
default = ["client", "server"]
serde1 = []
client = []
server = []

[badges]
travis-ci = { repository = "google/tarpc" }
Expand Down
25 changes: 25 additions & 0 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,16 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
ServiceGenerator {
response_fut_name,
service_ident: ident,
#[cfg(feature = "server")]
server_ident: &format_ident!("Serve{}", ident),
response_fut_ident: &Ident::new(response_fut_name, ident.span()),
#[cfg(feature = "client")]
client_ident: &format_ident!("{}Client", ident),
request_ident: &format_ident!("{}Request", ident),
response_ident: &format_ident!("{}Response", ident),
vis,
args,
#[cfg(feature = "client")]
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
method_idents: &methods,
request_names: &request_names,
Expand Down Expand Up @@ -432,9 +435,11 @@ fn verify_types_were_provided(
// the client stub.
struct ServiceGenerator<'a> {
service_ident: &'a Ident,
#[cfg(feature = "server")]
server_ident: &'a Ident,
response_fut_ident: &'a Ident,
response_fut_name: &'a str,
#[cfg(feature = "client")]
client_ident: &'a Ident,
request_ident: &'a Ident,
response_ident: &'a Ident,
Expand All @@ -445,6 +450,7 @@ struct ServiceGenerator<'a> {
future_types: &'a [Type],
method_idents: &'a [&'a Ident],
request_names: &'a [String],
#[cfg(feature = "client")]
method_attrs: &'a [&'a [Attribute]],
args: &'a [&'a [PatType]],
return_types: &'a [&'a Type],
Expand All @@ -461,6 +467,7 @@ impl<'a> ServiceGenerator<'a> {
future_types,
return_types,
service_ident,
#[cfg(feature = "server")]
server_ident,
..
} = self;
Expand Down Expand Up @@ -490,6 +497,7 @@ impl<'a> ServiceGenerator<'a> {
},
);

#[cfg(feature = "server")]
quote! {
#( #attrs )*
#vis trait #service_ident: Sized {
Expand All @@ -502,8 +510,16 @@ impl<'a> ServiceGenerator<'a> {
}
}
}
#[cfg(not(feature = "server"))]
quote! {
#( #attrs )*
#vis trait #service_ident: Sized {
#( #types_and_fns )*
}
}
}

#[cfg(feature = "server")]
fn struct_server(&self) -> TokenStream2 {
let &Self {
vis, server_ident, ..
Expand All @@ -518,6 +534,7 @@ impl<'a> ServiceGenerator<'a> {
}
}

#[cfg(feature = "server")]
fn impl_serve_for_server(&self) -> TokenStream2 {
let &Self {
request_ident,
Expand Down Expand Up @@ -675,6 +692,7 @@ impl<'a> ServiceGenerator<'a> {
}
}

#[cfg(feature = "client")]
fn struct_client(&self) -> TokenStream2 {
let &Self {
vis,
Expand All @@ -693,6 +711,7 @@ impl<'a> ServiceGenerator<'a> {
}
}

#[cfg(feature = "client")]
fn impl_client_new(&self) -> TokenStream2 {
let &Self {
client_ident,
Expand Down Expand Up @@ -724,6 +743,7 @@ impl<'a> ServiceGenerator<'a> {
}
}

#[cfg(feature = "client")]
fn impl_client_rpc_methods(&self) -> TokenStream2 {
let &Self {
client_ident,
Expand Down Expand Up @@ -766,15 +786,20 @@ impl<'a> ToTokens for ServiceGenerator<'a> {
fn to_tokens(&self, output: &mut TokenStream2) {
output.extend(vec![
self.trait_service(),
#[cfg(feature = "server")]
self.struct_server(),
#[cfg(feature = "server")]
self.impl_serve_for_server(),
self.enum_request(),
self.enum_response(),
self.enum_response_future(),
self.impl_debug_for_response_future(),
self.impl_future_for_response_future(),
#[cfg(feature = "client")]
self.struct_client(),
#[cfg(feature = "client")]
self.impl_client_new(),
#[cfg(feature = "client")]
self.impl_client_rpc_methods(),
])
}
Expand Down
20 changes: 14 additions & 6 deletions tarpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ readme = "README.md"
description = "An RPC framework for Rust with a focus on ease of use."

[features]
default = []
default = ["server", "client"]

server = ["tarpc-plugins/server"]
client = ["tarpc-plugins/client"]
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = ["tokio/rt"]
serde-transport = ["serde1", "tokio1", "tokio-serde", "tokio-util/codec"]
Expand All @@ -44,15 +46,13 @@ travis-ci = { repository = "google/tarpc" }
anyhow = "1.0"
fnv = "1.0"
futures = "0.3"
humantime = "2.0"
pin-project = "1.0"
rand = "0.8"
serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.12" }
tarpc-plugins = { path = "../plugins", version = "0.12", default-features = false }
thiserror = "1.0"
tokio = { version = "1", features = ["time"] }
tokio-util = { version = "0.7.3", features = ["time"] }
tokio = { version = "1" , features = ["sync"]}
tokio-serde = { optional = true, version = "0.8" }
tracing = { version = "0.1", default-features = false, features = [
"attributes",
Expand All @@ -61,6 +61,14 @@ tracing = { version = "0.1", default-features = false, features = [
tracing-opentelemetry = { version = "0.17.2", default-features = false }
opentelemetry = { version = "0.17.0", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasmtimer = "0.0.1"
tokio-util = { version = "0.7.3", features = ["codec"], default-features = false}
getrandom = { version = "0.2", features = ["js"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-util = { version = "0.7.3", features = ["time", "codec"], default-features = false}
humantime = "2.0"

[dev-dependencies]
assert_matches = "1.4"
Expand All @@ -75,7 +83,7 @@ opentelemetry-jaeger = { version = "0.16.0", features = ["rt-tokio"] }
pin-utils = "0.1.0-alpha"
serde_bytes = "0.11"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio = { version = "1", features = ["full", "test-util", "time"] }
tokio-serde = { version = "0.8", features = ["json", "bincode"] }
trybuild = "1.0"

Expand Down
13 changes: 11 additions & 2 deletions tarpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,23 @@ impl<Req, Resp> Clone for Channel<Req, Resp> {
impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves to the response.
#[tracing::instrument(
#[cfg_attr(not(target_arch="wasm32"),tracing::instrument(
name = "RPC",
skip(self, ctx, request_name, request),
fields(
rpc.trace_id = tracing::field::Empty,
rpc.deadline = %humantime::format_rfc3339(ctx.deadline),
otel.kind = "client",
otel.name = request_name)
)]
))]
#[cfg_attr(target_arch="wasm32", tracing::instrument(
name = "RPC",
skip(self, ctx, request_name, request),
fields(
rpc.trace_id = tracing::field::Empty,
otel.kind = "client",
otel.name = request_name)
))]
pub async fn call(
&self,
mut ctx: context::Context,
Expand Down Expand Up @@ -302,6 +310,7 @@ where
Close(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
#[cfg(not(target_arch = "wasm32"))]
Timer(#[source] tokio::time::error::Error),
}

Expand Down
5 changes: 5 additions & 0 deletions tarpc/src/client/in_flight_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::oneshot;

#[cfg(not(target_arch = "wasm32"))]
use tokio_util::time::delay_queue::{self, DelayQueue};
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio_util::delay_queue::{self, DelayQueue};

use tracing::Span;

/// Requests already written to the wire that haven't yet received responses.
Expand Down
16 changes: 11 additions & 5 deletions tarpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
use crate::trace::{self, TraceId};
use opentelemetry::trace::TraceContextExt;
use static_assertions::assert_impl_all;
use std::{
convert::TryFrom,
time::{Duration, SystemTime},
};
#[cfg(not(target_arch = "wasm32"))]
use std::time::SystemTime;
use std::{convert::TryFrom, time::Duration};
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::SystemTime;

use tracing_opentelemetry::OpenTelemetrySpanExt;

/// A request context that carries request-scoped information like deadlines and trace information.
Expand Down Expand Up @@ -41,7 +43,11 @@ pub struct Context {
#[cfg(feature = "serde1")]
mod absolute_to_relative_time {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub use std::time::{Duration, SystemTime};
pub use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::SystemTime;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::SystemTime;

pub fn serialize<S>(deadline: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
10 changes: 9 additions & 1 deletion tarpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,14 @@ pub use tarpc_plugins::service;
///
/// Note that this won't touch functions unless they have been annotated with
/// `async`, meaning that this should not break existing code.
#[cfg(feature = "server")]
pub use tarpc_plugins::server;

pub(crate) mod cancellations;
#[cfg(feature = "client")]
pub mod client;
pub mod context;
#[cfg(feature = "server")]
pub mod server;
pub mod transport;
pub(crate) mod util;
Expand All @@ -311,7 +314,12 @@ pub use crate::transport::sealed::Transport;

use anyhow::Context as _;
use futures::task::*;
use std::{error::Error, fmt::Display, io, time::SystemTime};
use std::{error::Error, fmt::Display, io};

#[cfg(not(target_arch = "wasm32"))]
use std::time::SystemTime;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::SystemTime;

/// A message from a client to a server.
#[derive(Debug)]
Expand Down
5 changes: 5 additions & 0 deletions tarpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

//! Provides a server that concurrently handles many connections sending multiplexed requests.

#[cfg(target_arch = "wasm32")]
compile_error!(
"Server can not compile to WASM targets. Please exclude \"server\" from features list."
);

use crate::{
cancellations::{cancellations, CanceledRequests, RequestCancellation},
context::{self, SpanExt},
Expand Down
2 changes: 1 addition & 1 deletion tarpc/src/server/in_flight_requests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::util::{Compact, TimeUntil};
use fnv::FnvHashMap;
use futures::future::{AbortHandle, AbortRegistration};
use std::time::SystemTime;
use std::{
collections::hash_map,
task::{Context, Poll},
time::SystemTime,
};
use tokio_util::time::delay_queue::{self, DelayQueue};
use tracing::Span;
Expand Down
7 changes: 6 additions & 1 deletion tarpc/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
use std::{
collections::HashMap,
hash::{BuildHasher, Hash},
time::{Duration, SystemTime},
time::Duration,
};

#[cfg(not(target_arch = "wasm32"))]
use std::time::SystemTime;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::SystemTime;

#[cfg(feature = "serde1")]
#[cfg_attr(docsrs, doc(cfg(feature = "serde1")))]
pub mod serde;
Expand Down