diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 64cb36f..ca48594 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -58,7 +58,7 @@ impl RoutingTable { let request_context = RequestContext { client_addr, }; - let response = rte.handle_request(&parts, data, &request_context, &self.global_context); + let response = rte.handle_request(&parts, data, &request_context, &self.global_context).await; Ok(response) }, Err(_) => Ok(not_found()), @@ -149,7 +149,7 @@ impl RoutingTableEntry { // TODO: I don't think this rightly belongs here. But // reasonable place to at least understand the decomposition and // dependencies. - pub fn handle_request( + pub async fn handle_request( &self, req: &Parts, body: Vec, @@ -159,7 +159,7 @@ impl RoutingTableEntry { match &self.handler_info { RouteHandler::HealthCheck => Response::new(Body::from("OK")), RouteHandler::Wasm(w) => { - let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()); + let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()).await; match response { Ok(res) => res, Err(e) => { diff --git a/src/handlers.rs b/src/handlers.rs index 391977f..67188ef 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,5 +1,4 @@ use std::{collections::HashMap}; -use std::sync::{Arc, RwLock}; use cap_std::fs::Dir; use hyper::{ @@ -16,6 +15,7 @@ use crate::dispatcher::RoutePattern; use crate::http_util::{internal_error, parse_cgi_headers}; use crate::request::{RequestContext, RequestGlobalContext}; +use crate::stream_writer::StreamWriter; use crate::wasm_module::WasmModuleSource; use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance}; @@ -36,7 +36,7 @@ pub struct WasmRouteHandler { } impl WasmRouteHandler { - pub fn handle_request( + pub async fn handle_request( &self, matched_route: &RoutePattern, req: &Parts, @@ -45,7 +45,25 @@ impl WasmRouteHandler { global_context: &RequestGlobalContext, logging_key: String, ) -> Result, anyhow::Error> { + + // These broken-out functions are slightly artificial but help solve some lifetime + // issues (where otherwise you get errors about things not being Send across an + // await). + let (stream_writer, instance, store) = + self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?; + self.spawn_wasm_instance(instance, store, stream_writer.clone()); + + let response = compose_response(stream_writer).await?; // TODO: handle errors + + // TODO: c'mon man + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + + Ok(response) + } + + fn set_up_runtime_environment(&self, matched_route: &RoutePattern, req: &Parts, request_body: Vec, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String) -> anyhow::Result<(crate::stream_writer::StreamWriter, Instance, Store)> { let startup_span = tracing::info_span!("module instantiation").entered(); + let headers = crate::http_util::build_headers( matched_route, req, @@ -57,33 +75,25 @@ impl WasmRouteHandler { ); let stream_writer = crate::stream_writer::StreamWriter::new(); - let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?; - let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?; - let (store, instance) = self.prepare_wasm_instance(global_context, ctx)?; - - // Drop manually to get instantiation time + drop(startup_span); + + Ok((stream_writer, instance, store)) + } + fn spawn_wasm_instance(&self, instance: Instance, store: Store, mut stream_writer: StreamWriter) { let entrypoint = self.entrypoint.clone(); let wasm_module_name = self.wasm_module_name.clone(); - let mut sw = stream_writer.clone(); + tokio::spawn(async move { match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) { - Ok(()) => sw.done().unwrap(), // TODO: <-- + Ok(()) => stream_writer.done().unwrap(), // TODO: <-- Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?! }; }); - - let response = Response::new(Body::wrap_stream(stream_writer.as_stream())); - // TODO: c'mon man - std::thread::sleep(std::time::Duration::from_millis(10)); - - // TODO: headers headers headers - - Ok(response) } fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap, redirects: crate::wasm_module::IOStreamRedirects) -> Result { @@ -132,34 +142,12 @@ impl WasmRouteHandler { } } -pub fn compose_response(stdout_mutex: Arc>>) -> Result, Error> { - // Okay, once we get here, all the information we need to send back in the response - // should be written to the STDOUT buffer. We fetch that, format it, and send - // it back. In the process, we might need to alter the status code of the result. - // - // This is a little janky, but basically we are looping through the output once, - // looking for the double-newline that distinguishes the headers from the body. - // The headers can then be parsed separately, while the body can be sent back - // to the client. - - let out = stdout_mutex.read().unwrap(); - let mut last = 0; - let mut scan_headers = true; - let mut buffer: Vec = Vec::new(); - let mut out_headers: Vec = Vec::new(); - out.iter().for_each(|i| { - if scan_headers && *i == 10 && last == 10 { - out_headers.append(&mut buffer); - buffer = Vec::new(); - scan_headers = false; - return; // Consume the linefeed - } - last = *i; - buffer.push(*i) - }); - let mut res = Response::new(Body::from(buffer)); +pub async fn compose_response(mut stream_writer: StreamWriter) -> anyhow::Result> { + let header_block = stream_writer.header_block().await?; + let mut res = Response::new(Body::wrap_stream(stream_writer.as_stream())); + let mut sufficient_response = false; - parse_cgi_headers(String::from_utf8(out_headers)?) + parse_cgi_headers(String::from_utf8(header_block)?) .iter() .for_each(|h| { use hyper::header::{CONTENT_TYPE, LOCATION}; diff --git a/src/stream_writer.rs b/src/stream_writer.rs index ca1a990..9634cd3 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -54,6 +54,24 @@ impl StreamWriter { } } + pub async fn header_block(&mut self) -> anyhow::Result> { + loop { + match self.pending.write().as_deref_mut() { + Ok(pending) => match split_at_two_newlines(&pending) { + None => (), + Some((header_block, rest)) => { + *pending = rest; + return Ok(header_block); + } + }, + Err(e) => { + return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e)); + }, + } + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + } + } + pub fn as_stream(mut self) -> impl futures_core::stream::Stream>> { stream! { loop { @@ -132,3 +150,47 @@ impl Write for StreamWriter { Ok(()) } } + +fn split_at_two_newlines(source: &[u8]) -> Option<(Vec, Vec)> { + let mut buffer = vec![]; + let mut last: u8 = 0; + for value in source { + if *value == 10 && last == 10 { + let rest_slice = &source[(buffer.len() + 1)..]; + let rest = Vec::from(rest_slice); + return Some((buffer, rest)); + } else { + buffer.push(*value); + last = *value; + } + } + None +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn splits_at_two_newlines_if_pair_only() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x0a, 0x43, 0x44]; + let result = split_at_two_newlines(&source).expect("did not split at all"); + assert_eq!(vec![0x41, 0x42, 0x0a], result.0); + assert_eq!(vec![0x43, 0x44], result.1); + } + + #[test] + fn doesnt_splits_at_two_newlines_if_no_pair() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x45, 0x46]; + let result = split_at_two_newlines(&source); + assert_eq!(None, result); + } + + #[test] + fn splits_at_two_newlines_empty_rest_if_at_end() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x0a]; + let result = split_at_two_newlines(&source).expect("did not split at all"); + assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0); + assert!(result.1.is_empty()); + } +}