Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming output #136

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[dependencies]
anyhow = "1.0"
async-stream = "0.3"
async-stream = "0.3.2"
async-trait = "0.1"
bindle = { version = "0.3", default-features = false, features = ["client", "server", "caching"] }
cap-std = "^0.22"
Expand Down Expand Up @@ -36,3 +36,5 @@
wasmtime-cache = "0.33"
wat = "1.0.37"
chrono = "0.4.19"
futures-util = "0.3.17"
futures-core = "0.3.17"
8 changes: 4 additions & 4 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl RoutingTable {
let request_context = RequestContext {
client_addr,
};
let response = rte.handle_request(&parts, data, &request_context, &self.global_context);
let response = rte.handle_request(&parts, data, &request_context, &self.global_context).await;
Ok(response)
},
Err(_) => Ok(not_found()),
Expand Down Expand Up @@ -149,7 +149,7 @@ impl RoutingTableEntry {
// TODO: I don't think this rightly belongs here. But
// reasonable place to at least understand the decomposition and
// dependencies.
pub fn handle_request(
pub async fn handle_request(
&self,
req: &Parts,
body: Vec<u8>,
Expand All @@ -159,7 +159,7 @@ impl RoutingTableEntry {
match &self.handler_info {
RouteHandler::HealthCheck => Response::new(Body::from("OK")),
RouteHandler::Wasm(w) => {
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key());
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()).await;
match response {
Ok(res) => res,
Err(e) => {
Expand Down Expand Up @@ -352,7 +352,7 @@ fn append_one_dynamic_route(routing_table_entry: &RoutingTableEntry, wasm_route_
}
}

fn build_wasi_context_for_dynamic_route_query(redirects: crate::wasm_module::IOStreamRedirects) -> wasi_common::WasiCtx {
fn build_wasi_context_for_dynamic_route_query(redirects: crate::wasm_module::IOStreamRedirects<Vec<u8>>) -> wasi_common::WasiCtx {
let builder = wasi_cap_std_sync::WasiCtxBuilder::new()
.stderr(Box::new(redirects.stderr))
.stdout(Box::new(redirects.stdout));
Expand Down
98 changes: 57 additions & 41 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{collections::HashMap};
use std::sync::{Arc, RwLock};

use wasi_cap_std_sync::Dir;
use hyper::{
Expand All @@ -16,8 +15,9 @@ use crate::dispatcher::RoutePattern;
use crate::http_util::{internal_error, parse_cgi_headers};
use crate::request::{RequestContext, RequestGlobalContext};

use crate::stream_writer::StreamWriter;
use crate::wasm_module::WasmModuleSource;
use crate::wasm_runner::{prepare_stdio_streams, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions};
use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions};

#[derive(Clone, Debug)]
pub enum RouteHandler {
Expand All @@ -36,41 +36,79 @@ pub struct WasmRouteHandler {
}

impl WasmRouteHandler {
pub fn handle_request(
pub async fn handle_request(
&self,
matched_route: &RoutePattern,
req: &Parts,
body: Vec<u8>,
request_body: Vec<u8>,
request_context: &RequestContext,
global_context: &RequestGlobalContext,
logging_key: String,
) -> Result<Response<Body>, anyhow::Error> {

// These broken-out functions are slightly artificial but help solve some lifetime
// issues (where otherwise you get errors about things not being Send across an
// await).
let (stream_writer, instance, store) =
self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?;
self.spawn_wasm_instance(instance, store, stream_writer.clone());

let response = match compose_response(stream_writer).await {
Ok(r) => r,
Err(e) => {
tracing::error!("Error parsing guest output into HTTP response: {}", e);
internal_error("internal error calling application")
}
};

tokio::task::yield_now().await;

Ok(response)
}

fn set_up_runtime_environment(&self, matched_route: &RoutePattern, req: &Parts, request_body: Vec<u8>, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String) -> anyhow::Result<(crate::stream_writer::StreamWriter, Instance, Store<WasiCtx>)> {
let startup_span = tracing::info_span!("module instantiation").entered();

let headers = crate::http_util::build_headers(
matched_route,
req,
body.len(),
request_body.len(),
request_context.client_addr,
global_context.default_host.as_str(),
global_context.use_tls,
&global_context.global_env_vars,
);

let redirects = prepare_stdio_streams(body, global_context, logging_key)?;

let stream_writer = crate::stream_writer::StreamWriter::new();
let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?;
let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?;

let (store, instance) = self.prepare_wasm_instance(global_context, ctx)?;

// Drop manually to get instantiation time

drop(startup_span);

Ok((stream_writer, instance, store))
}

run_prepared_wasm_instance(instance, store, &self.entrypoint, &self.wasm_module_name)?;

compose_response(redirects.stdout_mutex)
fn spawn_wasm_instance(&self, instance: Instance, store: Store<WasiCtx>, mut stream_writer: StreamWriter) {
let entrypoint = self.entrypoint.clone();
let wasm_module_name = self.wasm_module_name.clone();

// This is fire and forget, so there's a limited amount of error handling we
// can do.
tokio::spawn(async move {
match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) {
Ok(()) => (),
Err(e) => tracing::error!("Error running Wasm module: {}", e),
};
// TODO: should we attempt to write an error response to the StreamWriter here?
match stream_writer.done() {
Ok(()) => (),
Err(e) => tracing::error!("Error marking Wasm output as done: {}", e),
}
});
}

fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap<String, String>, redirects: crate::wasm_module::IOStreamRedirects) -> Result<WasiCtx, Error> {
fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap<String, String>, redirects: crate::wasm_module::IOStreamRedirects<crate::stream_writer::StreamWriter>) -> Result<WasiCtx, Error> {
let uri_path = req.uri.path();
let mut args = vec![uri_path.to_string()];
req.uri
Expand Down Expand Up @@ -110,34 +148,12 @@ impl WasmRouteHandler {
}
}

pub fn compose_response(stdout_mutex: Arc<RwLock<Vec<u8>>>) -> Result<Response<Body>, Error> {
// Okay, once we get here, all the information we need to send back in the response
// should be written to the STDOUT buffer. We fetch that, format it, and send
// it back. In the process, we might need to alter the status code of the result.
//
// This is a little janky, but basically we are looping through the output once,
// looking for the double-newline that distinguishes the headers from the body.
// The headers can then be parsed separately, while the body can be sent back
// to the client.

let out = stdout_mutex.read().unwrap();
let mut last = 0;
let mut scan_headers = true;
let mut buffer: Vec<u8> = Vec::new();
let mut out_headers: Vec<u8> = Vec::new();
out.iter().for_each(|i| {
if scan_headers && *i == 10 && last == 10 {
out_headers.append(&mut buffer);
buffer = Vec::new();
scan_headers = false;
return; // Consume the linefeed
}
last = *i;
buffer.push(*i)
});
let mut res = Response::new(Body::from(buffer));
pub async fn compose_response(mut stream_writer: StreamWriter) -> anyhow::Result<Response<Body>> {
let header_block = stream_writer.header_block().await?;
let mut res = Response::new(Body::wrap_stream(stream_writer.as_stream()));

let mut sufficient_response = false;
parse_cgi_headers(String::from_utf8(out_headers)?)
parse_cgi_headers(String::from_utf8(header_block)?)
.iter()
.for_each(|h| {
use hyper::header::{CONTENT_TYPE, LOCATION};
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) mod handlers;
mod http_util;
pub (crate) mod module_loader;
mod request;
pub (crate) mod stream_writer;
mod tls;
pub mod version;
pub mod wagi_app;
Expand Down
Loading