Skip to content

Commit

Permalink
Reinstate HTTP headers
Browse files Browse the repository at this point in the history
  • Loading branch information
itowlson committed Nov 21, 2021
1 parent b3b3f89 commit fd92a9e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 47 deletions.
6 changes: 3 additions & 3 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl RoutingTable {
let request_context = RequestContext {
client_addr,
};
let response = rte.handle_request(&parts, data, &request_context, &self.global_context);
let response = rte.handle_request(&parts, data, &request_context, &self.global_context).await;
Ok(response)
},
Err(_) => Ok(not_found()),
Expand Down Expand Up @@ -149,7 +149,7 @@ impl RoutingTableEntry {
// TODO: I don't think this rightly belongs here. But
// reasonable place to at least understand the decomposition and
// dependencies.
pub fn handle_request(
pub async fn handle_request(
&self,
req: &Parts,
body: Vec<u8>,
Expand All @@ -159,7 +159,7 @@ impl RoutingTableEntry {
match &self.handler_info {
RouteHandler::HealthCheck => Response::new(Body::from("OK")),
RouteHandler::Wasm(w) => {
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key());
let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()).await;
match response {
Ok(res) => res,
Err(e) => {
Expand Down
76 changes: 32 additions & 44 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{collections::HashMap};
use std::sync::{Arc, RwLock};

use cap_std::fs::Dir;
use hyper::{
Expand All @@ -16,6 +15,7 @@ use crate::dispatcher::RoutePattern;
use crate::http_util::{internal_error, parse_cgi_headers};
use crate::request::{RequestContext, RequestGlobalContext};

use crate::stream_writer::StreamWriter;
use crate::wasm_module::WasmModuleSource;
use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance};

Expand All @@ -36,7 +36,7 @@ pub struct WasmRouteHandler {
}

impl WasmRouteHandler {
pub fn handle_request(
pub async fn handle_request(
&self,
matched_route: &RoutePattern,
req: &Parts,
Expand All @@ -45,7 +45,25 @@ impl WasmRouteHandler {
global_context: &RequestGlobalContext,
logging_key: String,
) -> Result<Response<Body>, anyhow::Error> {

// These broken-out functions are slightly artificial but help solve some lifetime
// issues (where otherwise you get errors about things not being Send across an
// await).
let (stream_writer, instance, store) =
self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?;
self.spawn_wasm_instance(instance, store, stream_writer.clone());

let response = compose_response(stream_writer).await?; // TODO: handle errors

// TODO: c'mon man
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;

Ok(response)
}

fn set_up_runtime_environment(&self, matched_route: &RoutePattern, req: &Parts, request_body: Vec<u8>, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String) -> anyhow::Result<(crate::stream_writer::StreamWriter, Instance, Store<WasiCtx>)> {
let startup_span = tracing::info_span!("module instantiation").entered();

let headers = crate::http_util::build_headers(
matched_route,
req,
Expand All @@ -57,33 +75,25 @@ impl WasmRouteHandler {
);

let stream_writer = crate::stream_writer::StreamWriter::new();

let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?;

let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?;

let (store, instance) = self.prepare_wasm_instance(global_context, ctx)?;

// Drop manually to get instantiation time

drop(startup_span);

Ok((stream_writer, instance, store))
}

fn spawn_wasm_instance(&self, instance: Instance, store: Store<WasiCtx>, mut stream_writer: StreamWriter) {
let entrypoint = self.entrypoint.clone();
let wasm_module_name = self.wasm_module_name.clone();
let mut sw = stream_writer.clone();

tokio::spawn(async move {
match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) {
Ok(()) => sw.done().unwrap(), // TODO: <--
Ok(()) => stream_writer.done().unwrap(), // TODO: <--
Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?!
};
});

let response = Response::new(Body::wrap_stream(stream_writer.as_stream()));
// TODO: c'mon man
std::thread::sleep(std::time::Duration::from_millis(10));

// TODO: headers headers headers

Ok(response)
}

fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap<String, String>, redirects: crate::wasm_module::IOStreamRedirects<crate::stream_writer::StreamWriter>) -> Result<WasiCtx, Error> {
Expand Down Expand Up @@ -132,34 +142,12 @@ impl WasmRouteHandler {
}
}

pub fn compose_response(stdout_mutex: Arc<RwLock<Vec<u8>>>) -> Result<Response<Body>, Error> {
// Okay, once we get here, all the information we need to send back in the response
// should be written to the STDOUT buffer. We fetch that, format it, and send
// it back. In the process, we might need to alter the status code of the result.
//
// This is a little janky, but basically we are looping through the output once,
// looking for the double-newline that distinguishes the headers from the body.
// The headers can then be parsed separately, while the body can be sent back
// to the client.

let out = stdout_mutex.read().unwrap();
let mut last = 0;
let mut scan_headers = true;
let mut buffer: Vec<u8> = Vec::new();
let mut out_headers: Vec<u8> = Vec::new();
out.iter().for_each(|i| {
if scan_headers && *i == 10 && last == 10 {
out_headers.append(&mut buffer);
buffer = Vec::new();
scan_headers = false;
return; // Consume the linefeed
}
last = *i;
buffer.push(*i)
});
let mut res = Response::new(Body::from(buffer));
pub async fn compose_response(mut stream_writer: StreamWriter) -> anyhow::Result<Response<Body>> {
let header_block = stream_writer.header_block().await?;
let mut res = Response::new(Body::wrap_stream(stream_writer.as_stream()));

let mut sufficient_response = false;
parse_cgi_headers(String::from_utf8(out_headers)?)
parse_cgi_headers(String::from_utf8(header_block)?)
.iter()
.for_each(|h| {
use hyper::header::{CONTENT_TYPE, LOCATION};
Expand Down
62 changes: 62 additions & 0 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ impl StreamWriter {
}
}

pub async fn header_block(&mut self) -> anyhow::Result<Vec<u8>> {
loop {
match self.pending.write().as_deref_mut() {
Ok(pending) => match split_at_two_newlines(&pending) {
None => (),
Some((header_block, rest)) => {
*pending = rest;
return Ok(header_block);
}
},
Err(e) => {
return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e));
},
}
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
}
}

pub fn as_stream(mut self) -> impl futures_core::stream::Stream<Item = anyhow::Result<Vec<u8>>> {
stream! {
loop {
Expand Down Expand Up @@ -132,3 +150,47 @@ impl Write for StreamWriter {
Ok(())
}
}

fn split_at_two_newlines(source: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
let mut buffer = vec![];
let mut last: u8 = 0;
for value in source {
if *value == 10 && last == 10 {
let rest_slice = &source[(buffer.len() + 1)..];
let rest = Vec::from(rest_slice);
return Some((buffer, rest));
} else {
buffer.push(*value);
last = *value;
}
}
None
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn splits_at_two_newlines_if_pair_only() {
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x0a, 0x43, 0x44];
let result = split_at_two_newlines(&source).expect("did not split at all");
assert_eq!(vec![0x41, 0x42, 0x0a], result.0);
assert_eq!(vec![0x43, 0x44], result.1);
}

#[test]
fn doesnt_splits_at_two_newlines_if_no_pair() {
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x45, 0x46];
let result = split_at_two_newlines(&source);
assert_eq!(None, result);
}

#[test]
fn splits_at_two_newlines_empty_rest_if_at_end() {
let source: Vec<u8> = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x0a];
let result = split_at_two_newlines(&source).expect("did not split at all");
assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0);
assert!(result.1.is_empty());
}
}

0 comments on commit fd92a9e

Please sign in to comment.