Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions examples/rust/streams-web-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[package]
name = "streams-web-client"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
rustls = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
url = { workspace = true }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true }
wrpc-transport-web = { workspace = true }
wtransport = { workspace = true }
143 changes: 143 additions & 0 deletions examples/rust/streams-web-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use core::time::Duration;

use std::sync::Arc;

use anyhow::Context as _;
use bytes::Bytes;
use clap::Parser;
use futures::{stream, StreamExt as _};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::version::TLS13;
use rustls::{DigitallySignedStruct, SignatureScheme};
use tokio::{time, try_join};
use tokio_stream::wrappers::IntervalStream;
use tracing::debug;
use url::Url;
use wtransport::{ClientConfig, Endpoint};

mod bindings {
wit_bindgen_wrpc::generate!({
with: {
"wrpc-examples:streams/handler": generate
}
});
}

use bindings::wrpc_examples::streams::handler::{echo, Req};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to invoke `wrpc-examples:streams/handler.echo` on
#[arg(default_value = "https://localhost:4433")]
addr: Url,
}

#[derive(Debug)]
struct Insecure;

impl ServerCertVerifier for Insecure {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}

fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
vec![SignatureScheme::ECDSA_NISTP256_SHA256]
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();

let Args { addr } = Args::parse();

let conf = rustls::ClientConfig::builder_with_protocol_versions(&[&TLS13])
.dangerous()
.with_custom_certificate_verifier(Arc::new(Insecure))
.with_no_client_auth();

let ep = Endpoint::client(
ClientConfig::builder()
.with_bind_default()
.with_custom_tls(conf)
.build(),
)
.context("failed to create endpoint")?;

let session = ep
.connect(&addr)
.await
.context("failed to connect to server")?;
let wrpc = wrpc_transport_web::Client::from(session);

let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);

// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);

let (mut numbers, mut bytes, io) = echo(&wrpc, (), Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
)?;
Ok(())
}
4 changes: 4 additions & 0 deletions examples/rust/streams-web-client/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[streams]
path = "../../../wit/streams"
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
1 change: 1 addition & 0 deletions examples/rust/streams-web-client/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streams = "../../../wit/streams"
17 changes: 17 additions & 0 deletions examples/rust/streams-web-client/wit/deps/streams/streams.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wrpc-examples:streams;

interface handler {
record req {
numbers: stream<u64>,
bytes: stream<u8>,
}
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
}

world client {
import handler;
}

world server {
export handler;
}
5 changes: 5 additions & 0 deletions examples/rust/streams-web-client/wit/world.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package wrpc-examples:streams-rust-client;

world client {
include wrpc-examples:streams/client;
}
36 changes: 36 additions & 0 deletions examples/rust/streams-web-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "streams-web-server"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
rcgen = { workspace = true, features = ["crypto", "ring", "zeroize"] }
rustls = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "signal"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true }
wrpc-transport-web = { workspace = true }
wtransport = { workspace = true }
Loading
Loading