From e3a5eacdacf1d28ebcae8d562afd3065b07480e0 Mon Sep 17 00:00:00 2001 From: itowlson Date: Thu, 18 Nov 2021 17:33:37 +1300 Subject: [PATCH 1/7] IT LIVES! But is horrible. But anyway it's a checkpoint --- Cargo.lock | 30 ++++---- Cargo.toml | 4 +- src/dispatcher.rs | 2 +- src/handlers.rs | 30 ++++++-- src/lib.rs | 1 + src/stream_writer.rs | 106 +++++++++++++++++++++++++++++ src/wasm_module.rs | 12 ++-- src/wasm_runner.rs | 158 ++++++++++++++++++++++++++++++++++++++++++- 8 files changed, 313 insertions(+), 30 deletions(-) create mode 100644 src/stream_writer.rs diff --git a/Cargo.lock b/Cargo.lock index f231125..e0ba60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -785,9 +785,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" +checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" dependencies = [ "futures-core", "futures-sink", @@ -795,9 +795,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" [[package]] name = "futures-executor" @@ -812,15 +812,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" [[package]] name = "futures-macro" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" +checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" dependencies = [ "autocfg", "proc-macro-hack", @@ -831,21 +831,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" +checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" [[package]] name = "futures-task" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" +checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" [[package]] name = "futures-util" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" +checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" dependencies = [ "autocfg", "futures-channel", @@ -2721,6 +2721,8 @@ dependencies = [ "docker_credential", "env-file-reader", "futures", + "futures-core", + "futures-util", "hyper", "indexmap", "oci-distribution", diff --git a/Cargo.toml b/Cargo.toml index 0163913..95e5520 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ [dependencies] anyhow = "1.0" - async-stream = "0.3" + async-stream = "0.3.2" async-trait = "0.1" bindle = { version = "0.3", default-features = false, features = ["client", "server", "caching"] } cap-std = "^0.22" @@ -36,3 +36,5 @@ wasmtime-cache = "0.33" wat = "1.0.37" chrono = "0.4.19" +futures-util = "0.3.17" +futures-core = "0.3.17" diff --git a/src/dispatcher.rs b/src/dispatcher.rs index ed81615..96a53b2 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -352,7 +352,7 @@ fn append_one_dynamic_route(routing_table_entry: &RoutingTableEntry, wasm_route_ } } -fn build_wasi_context_for_dynamic_route_query(redirects: crate::wasm_module::IOStreamRedirects) -> wasi_common::WasiCtx { +fn build_wasi_context_for_dynamic_route_query(redirects: crate::wasm_module::IOStreamRedirects>) -> wasi_common::WasiCtx { let builder = wasi_cap_std_sync::WasiCtxBuilder::new() .stderr(Box::new(redirects.stderr)) .stdout(Box::new(redirects.stdout)); diff --git a/src/handlers.rs b/src/handlers.rs index 9c9e174..28ec002 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use std::{collections::HashMap}; use std::sync::{Arc, RwLock}; @@ -17,7 +18,7 @@ use crate::http_util::{internal_error, parse_cgi_headers}; use crate::request::{RequestContext, RequestGlobalContext}; use crate::wasm_module::WasmModuleSource; -use crate::wasm_runner::{prepare_stdio_streams, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions}; +use crate::wasm_runner::{SenderPlusPlus2, prepare_stdio_streams, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions}; #[derive(Clone, Debug)] pub enum RouteHandler { @@ -40,7 +41,7 @@ impl WasmRouteHandler { &self, matched_route: &RoutePattern, req: &Parts, - body: Vec, + request_body: Vec, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String, @@ -49,14 +50,16 @@ impl WasmRouteHandler { let headers = crate::http_util::build_headers( matched_route, req, - body.len(), + request_body.len(), request_context.client_addr, global_context.default_host.as_str(), global_context.use_tls, &global_context.global_env_vars, ); - let redirects = prepare_stdio_streams(body, global_context, logging_key)?; + let stream_writer = crate::stream_writer::StreamWriter::new(); + + let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?; let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?; @@ -65,12 +68,25 @@ impl WasmRouteHandler { // Drop manually to get instantiation time drop(startup_span); - run_prepared_wasm_instance(instance, store, &self.entrypoint, &self.wasm_module_name)?; + let entrypoint = self.entrypoint.clone(); + let wasm_module_name = self.wasm_module_name.clone(); + let mut sw = stream_writer.clone(); + tokio::spawn(async move { + match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) { + Ok(()) => sw.done().unwrap(), // TODO: <-- + Err(e) => tracing::error!("oh no {}", e), + }; + }); + + // compose_response(redirects.stdout_mutex) + let response = Response::new(Body::wrap_stream(stream_writer.as_stream())); + // tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; + std::thread::sleep(std::time::Duration::from_millis(10)); - compose_response(redirects.stdout_mutex) + Ok(response) } - fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap, redirects: crate::wasm_module::IOStreamRedirects) -> Result { + fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap, redirects: crate::wasm_module::IOStreamRedirects) -> Result { let uri_path = req.uri.path(); let mut args = vec![uri_path.to_string()]; req.uri diff --git a/src/lib.rs b/src/lib.rs index 88353de..404ce8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub(crate) mod handlers; mod http_util; pub (crate) mod module_loader; mod request; +pub (crate) mod stream_writer; mod tls; pub mod version; pub mod wagi_app; diff --git a/src/stream_writer.rs b/src/stream_writer.rs new file mode 100644 index 0000000..eee9c99 --- /dev/null +++ b/src/stream_writer.rs @@ -0,0 +1,106 @@ +use std::{convert::Infallible, io::Write, sync::{Arc, RwLock}}; + +use async_stream::stream; + +#[derive(Clone)] +pub struct StreamWriter { + pending: Arc>>, + done: Arc>, +} + +impl StreamWriter { + pub fn new() -> Self { + Self { + pending: Arc::new(RwLock::new(vec![])), + done: Arc::new(RwLock::new(false)), + } + } + + fn append(&mut self, buf: &[u8]) -> anyhow::Result<()> { + match self.pending.write().as_mut() { + Ok(pending) => { + pending.extend_from_slice(buf); + Ok(()) + }, + Err(e) => + Err(anyhow::anyhow!("Can't append to W2 buffer: {}", e)) + } + } + + pub fn done(&mut self) -> anyhow::Result<()> { + match self.done.write().as_deref_mut() { + Ok(d) => { + *d = true; + println!("marked done"); + Ok(()) + }, + Err(e) => + Err(anyhow::anyhow!("Can't done the W2: {}", e)) + } + } + + pub fn as_stream(mut self) -> impl futures_core::stream::Stream, Infallible>> { + stream! { + loop { + let data = self.pop(); + match data { + Ok(v) => { + println!("yielding {} bytes", v.len()); + if v.is_empty() { + if self.is_done() { + println!("doneburger"); + return; + } else { + tokio::time::sleep(tokio::time::Duration::from_micros(20)).await; + } + } else { + yield Ok(v); + } + }, + Err(e) => { + if self.is_done() { + println!("done!!!!"); + return; + } else { + () + } + }, + } + } + } + } + + fn is_done(&self) -> bool { + match self.done.read() { + Ok(d) => *d, + Err(_) => false, + } + } + + fn pop(&mut self) -> anyhow::Result> { + let data = match self.pending.write().as_mut() { + Ok(pending) => { + let res = Ok(pending.clone()); + pending.clear(); + res + }, + Err(e) => { + Err(anyhow::anyhow!("Error gaining write access: {}", e)) + } + }; + data + } +} + +impl Write for StreamWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.append(buf).map_err( + |e| std::io::Error::new(std::io::ErrorKind::Other, e) + )?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} diff --git a/src/wasm_module.rs b/src/wasm_module.rs index 7bad77c..ed76759 100644 --- a/src/wasm_module.rs +++ b/src/wasm_module.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::{Arc, RwLock}}; +use std::{fmt::Debug, io::Write, sync::{Arc, RwLock}}; use wasi_common::pipe::{ReadPipe, WritePipe}; use wasmtime::*; @@ -31,13 +31,13 @@ impl Debug for WasmModuleSource { // constraints from the stdout_mutex. Not sure how to do this better. // (I don't want to .clone() the fields even though that would work, // because that is misleading about the semantics.) -pub struct IOStreamRedirects { +pub struct IOStreamRedirects { pub stdin: ReadPipe>>, - pub stdout: WritePipe>, + pub stdout: WritePipe, pub stderr: wasi_cap_std_sync::file::File, } -pub struct IORedirectionInfo { - pub streams: IOStreamRedirects, - pub stdout_mutex: Arc>>, +pub struct IORedirectionInfo { + pub streams: IOStreamRedirects, + pub stdout_mutex: Arc>, } diff --git a/src/wasm_runner.rs b/src/wasm_runner.rs index 3b0152a..22757cc 100644 --- a/src/wasm_runner.rs +++ b/src/wasm_runner.rs @@ -1,3 +1,5 @@ +use std::convert::Infallible; +use std::io::Write; use std::path::Path; use std::sync::{Arc, RwLock}; @@ -47,7 +49,7 @@ pub fn prepare_stdio_streams( body: Vec, global_context: &RequestGlobalContext, handler_id: String, -) -> Result { +) -> Result>, Error> { let stdin = ReadPipe::from(body); let stdout_buf: Vec = vec![]; let stdout_mutex = Arc::new(RwLock::new(stdout_buf)); @@ -78,6 +80,160 @@ pub fn prepare_stdio_streams( }) } +pub fn prepare_stdio_streams_for_http( + body: Vec, + stream_writer: crate::stream_writer::StreamWriter, + global_context: &RequestGlobalContext, + handler_id: String, +) -> Result, Error> { + let stdin = ReadPipe::from(body); + let stdout_mutex = Arc::new(RwLock::new(stream_writer)); + let stdout = WritePipe::from_shared(stdout_mutex.clone()); + let log_dir = global_context.base_log_dir.join(handler_id); + + // The spec does not say what to do with STDERR. + // See specifically sections 4.2 and 6.1 of RFC 3875. + // Currently, we will attach to wherever logs go. + tracing::info!(log_dir = %log_dir.display(), "Using log dir"); + std::fs::create_dir_all(&log_dir)?; + let stderr = cap_std::fs::File::from_std( + std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(log_dir.join(STDERR_FILE))?, + ambient_authority(), + ); + let stderr = wasi_cap_std_sync::file::File::from_cap_std(stderr); + + Ok(crate::wasm_module::IORedirectionInfo { + streams: crate::wasm_module::IOStreamRedirects { + stdin, + stdout, + stderr, + }, + stdout_mutex, + }) +} + +pub struct SenderPlusPlus2 { + sender: futures::channel::mpsc::Sender, Infallible>>, +} + +impl Write for SenderPlusPlus2 { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + println!("write {} bytes", buf.len()); + let len = buf.len(); + let mut v = Vec::with_capacity(len); + v.extend_from_slice(buf); + loop { + match self.sender.try_send(Ok(v)) { + Ok(_) => break, + Err(e) => { + println!("err"); + if e.is_full() { + println!("retry time!"); + v = e.into_inner().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } else { + return Err(std::io::Error::new(std::io::ErrorKind::Other, e)); + } + }, + } + } + Ok(len) + } + + fn flush(&mut self) -> std::io::Result<()> { + println!("FLUSHY FLUSHY"); + Ok(()) + } +} + +// pub struct BufflesMcPuffles { +// pending: Vec>, +// write_completed: bool, +// } + +// impl BufflesMcPuffles { +// pub fn mark_finished(&mut self) { +// self.write_completed = true; +// } +// } + +// impl Write for BufflesMcPuffles { +// fn write(&mut self, buf: &[u8]) -> std::io::Result { +// let mut v = vec![]; +// for b in buf { v.push(*b); } // has to be a better way +// self.pending.push(v); +// Ok(buf.len()) +// } + +// fn flush(&mut self) -> std::io::Result<()> { +// Ok(()) +// } +// } + +// impl futures_core::Stream for BufflesMcPuffles { +// type Item = Vec; + +// fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { +// let mut pending = self.pending; +// match self.pending.pop() { +// None => if self.write_completed { +// std::task::Poll::Ready(None) +// } else { +// std::task::Poll::Pending +// }, +// Some(v) => std::task::Poll::Ready(Some(v)), +// } +// } +// } + +// fn make_write_pipe_over_sender(sender: hyper::body::Sender) -> Arc> { // WritePipe { +// let spp = SenderPlusPlus { sender }; +// Arc::new(RwLock::new(spp)) +// // let spp_shared = Arc::new(RwLock::new(spp)); +// // WritePipe::from_shared(spp_shared) +// } + +// pub struct SenderPlusPlus { +// sender: hyper::body::Sender, +// } + +// impl std::io::Write for SenderPlusPlus { +// fn write(&mut self, buf: &[u8]) -> std::io::Result { +// let len = buf.len(); +// let bytes = hyper::body::Bytes::copy_from_slice(buf); +// // let rt = tokio::runtime::Runtime::new().unwrap(); // TODO +// // let res = rt.block_on(async move { +// // self.sender.send_data(bytes).await +// // }).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + +// // let jh = tokio::spawn(async move { +// // self.sender.send_data(bytes).await +// // }); +// // tokio::task::spawn_blocking(jh); + +// let content = String::from_utf8_lossy(&bytes); +// tracing::error!("Response chunk: '{}'", content); + +// self.sender.try_send_data(bytes) +// .map_err(|b| { +// let content = String::from_utf8_lossy(&b); +// std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("Sender could not accept moar bytes '{}'", content)) +// })?; + +// tracing::error!("...sent okay"); + +// Ok(len) +// } + +// fn flush(&mut self) -> std::io::Result<()> { +// Ok(()) // ? +// } +// } + pub fn new_store_and_engine( cache_config_path: &Path, ctx: WasiCtx, From 34480ae2098d2fce587902fd17490cdfce764d2f Mon Sep 17 00:00:00 2001 From: itowlson Date: Fri, 19 Nov 2021 12:51:19 +1300 Subject: [PATCH 2/7] Remove a bunch of dead code. Farewell, BufflesMcPuffles. Your service will not be forgotten. --- src/handlers.rs | 10 ++-- src/stream_writer.rs | 56 +++++++++++++++----- src/wasm_runner.rs | 122 ------------------------------------------- 3 files changed, 47 insertions(+), 141 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index 28ec002..1ae6cdf 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,4 +1,3 @@ -use std::convert::Infallible; use std::{collections::HashMap}; use std::sync::{Arc, RwLock}; @@ -18,7 +17,7 @@ use crate::http_util::{internal_error, parse_cgi_headers}; use crate::request::{RequestContext, RequestGlobalContext}; use crate::wasm_module::WasmModuleSource; -use crate::wasm_runner::{SenderPlusPlus2, prepare_stdio_streams, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions}; +use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions}; #[derive(Clone, Debug)] pub enum RouteHandler { @@ -74,15 +73,16 @@ impl WasmRouteHandler { tokio::spawn(async move { match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) { Ok(()) => sw.done().unwrap(), // TODO: <-- - Err(e) => tracing::error!("oh no {}", e), + Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?! }; }); - // compose_response(redirects.stdout_mutex) let response = Response::new(Body::wrap_stream(stream_writer.as_stream())); - // tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; + // TODO: c'mon man std::thread::sleep(std::time::Duration::from_millis(10)); + // TODO: headers headers headers + Ok(response) } diff --git a/src/stream_writer.rs b/src/stream_writer.rs index eee9c99..ca1a990 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, io::Write, sync::{Arc, RwLock}}; +use std::{io::Write, sync::{Arc, RwLock}}; use async_stream::stream; @@ -6,52 +6,79 @@ use async_stream::stream; pub struct StreamWriter { pending: Arc>>, done: Arc>, + // A way for the write side to signal new data to the stream side + write_index: Arc>, + write_index_sender: Arc>, + write_index_receiver: tokio::sync::watch::Receiver, } impl StreamWriter { pub fn new() -> Self { + let write_index = 0; + let (tx, rx) = tokio::sync::watch::channel(write_index); Self { pending: Arc::new(RwLock::new(vec![])), done: Arc::new(RwLock::new(false)), + write_index: Arc::new(RwLock::new(write_index)), + write_index_sender: Arc::new(tx), + write_index_receiver: rx, } } fn append(&mut self, buf: &[u8]) -> anyhow::Result<()> { - match self.pending.write().as_mut() { + let result = match self.pending.write().as_mut() { Ok(pending) => { pending.extend_from_slice(buf); Ok(()) }, Err(e) => - Err(anyhow::anyhow!("Can't append to W2 buffer: {}", e)) + Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) + }; + { + let mut write_index = self.write_index.write().unwrap(); + *write_index = *write_index + 1; + self.write_index_sender.send(*write_index).unwrap(); } + result } pub fn done(&mut self) -> anyhow::Result<()> { match self.done.write().as_deref_mut() { Ok(d) => { *d = true; - println!("marked done"); Ok(()) }, Err(e) => - Err(anyhow::anyhow!("Can't done the W2: {}", e)) + Err(anyhow::anyhow!("Internal error: StreamWriter::done can't take lock: {}", e)) + } } - pub fn as_stream(mut self) -> impl futures_core::stream::Stream, Infallible>> { + pub fn as_stream(mut self) -> impl futures_core::stream::Stream>> { stream! { loop { let data = self.pop(); match data { Ok(v) => { - println!("yielding {} bytes", v.len()); if v.is_empty() { if self.is_done() { - println!("doneburger"); return; } else { - tokio::time::sleep(tokio::time::Duration::from_micros(20)).await; + // Not sure this is the smoothest way to do it. The oldest way was: + // tokio::time::sleep(tokio::time::Duration::from_micros(20)).await; + // which is a hideous kludge but subjectively felt quicker (but the + // number say not, so what is truth anyway) + match self.write_index_receiver.changed().await { + Ok(_) => continue, + Err(e) => { + // If this ever happens (which it, cough, shouldn't), it means all senders have + // closed, which _should_ mean we are done. Log the error + // but don't return it to the stream: the response as streamed so far + // _should_ be okay! + tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); + return; + } + } } } else { yield Ok(v); @@ -59,10 +86,10 @@ impl StreamWriter { }, Err(e) => { if self.is_done() { - println!("done!!!!"); return; } else { - () + yield Err(e); + return; } }, } @@ -80,12 +107,12 @@ impl StreamWriter { fn pop(&mut self) -> anyhow::Result> { let data = match self.pending.write().as_mut() { Ok(pending) => { - let res = Ok(pending.clone()); + let res = pending.clone(); pending.clear(); - res + Ok(res) }, Err(e) => { - Err(anyhow::anyhow!("Error gaining write access: {}", e)) + Err(anyhow::anyhow!("Internal error: StreamWriter::pop can't take lock: {}", e)) } }; data @@ -97,6 +124,7 @@ impl Write for StreamWriter { self.append(buf).map_err( |e| std::io::Error::new(std::io::ErrorKind::Other, e) )?; + Ok(buf.len()) } diff --git a/src/wasm_runner.rs b/src/wasm_runner.rs index 22757cc..b8bc939 100644 --- a/src/wasm_runner.rs +++ b/src/wasm_runner.rs @@ -1,5 +1,3 @@ -use std::convert::Infallible; -use std::io::Write; use std::path::Path; use std::sync::{Arc, RwLock}; @@ -101,7 +99,6 @@ pub fn prepare_stdio_streams_for_http( .append(true) .create(true) .open(log_dir.join(STDERR_FILE))?, - ambient_authority(), ); let stderr = wasi_cap_std_sync::file::File::from_cap_std(stderr); @@ -115,125 +112,6 @@ pub fn prepare_stdio_streams_for_http( }) } -pub struct SenderPlusPlus2 { - sender: futures::channel::mpsc::Sender, Infallible>>, -} - -impl Write for SenderPlusPlus2 { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - println!("write {} bytes", buf.len()); - let len = buf.len(); - let mut v = Vec::with_capacity(len); - v.extend_from_slice(buf); - loop { - match self.sender.try_send(Ok(v)) { - Ok(_) => break, - Err(e) => { - println!("err"); - if e.is_full() { - println!("retry time!"); - v = e.into_inner().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - continue; - } else { - return Err(std::io::Error::new(std::io::ErrorKind::Other, e)); - } - }, - } - } - Ok(len) - } - - fn flush(&mut self) -> std::io::Result<()> { - println!("FLUSHY FLUSHY"); - Ok(()) - } -} - -// pub struct BufflesMcPuffles { -// pending: Vec>, -// write_completed: bool, -// } - -// impl BufflesMcPuffles { -// pub fn mark_finished(&mut self) { -// self.write_completed = true; -// } -// } - -// impl Write for BufflesMcPuffles { -// fn write(&mut self, buf: &[u8]) -> std::io::Result { -// let mut v = vec![]; -// for b in buf { v.push(*b); } // has to be a better way -// self.pending.push(v); -// Ok(buf.len()) -// } - -// fn flush(&mut self) -> std::io::Result<()> { -// Ok(()) -// } -// } - -// impl futures_core::Stream for BufflesMcPuffles { -// type Item = Vec; - -// fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { -// let mut pending = self.pending; -// match self.pending.pop() { -// None => if self.write_completed { -// std::task::Poll::Ready(None) -// } else { -// std::task::Poll::Pending -// }, -// Some(v) => std::task::Poll::Ready(Some(v)), -// } -// } -// } - -// fn make_write_pipe_over_sender(sender: hyper::body::Sender) -> Arc> { // WritePipe { -// let spp = SenderPlusPlus { sender }; -// Arc::new(RwLock::new(spp)) -// // let spp_shared = Arc::new(RwLock::new(spp)); -// // WritePipe::from_shared(spp_shared) -// } - -// pub struct SenderPlusPlus { -// sender: hyper::body::Sender, -// } - -// impl std::io::Write for SenderPlusPlus { -// fn write(&mut self, buf: &[u8]) -> std::io::Result { -// let len = buf.len(); -// let bytes = hyper::body::Bytes::copy_from_slice(buf); -// // let rt = tokio::runtime::Runtime::new().unwrap(); // TODO -// // let res = rt.block_on(async move { -// // self.sender.send_data(bytes).await -// // }).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - -// // let jh = tokio::spawn(async move { -// // self.sender.send_data(bytes).await -// // }); -// // tokio::task::spawn_blocking(jh); - -// let content = String::from_utf8_lossy(&bytes); -// tracing::error!("Response chunk: '{}'", content); - -// self.sender.try_send_data(bytes) -// .map_err(|b| { -// let content = String::from_utf8_lossy(&b); -// std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("Sender could not accept moar bytes '{}'", content)) -// })?; - -// tracing::error!("...sent okay"); - -// Ok(len) -// } - -// fn flush(&mut self) -> std::io::Result<()> { -// Ok(()) // ? -// } -// } - pub fn new_store_and_engine( cache_config_path: &Path, ctx: WasiCtx, From 13c06a13ed577a3f9fcddece2ad619c7ed9117c3 Mon Sep 17 00:00:00 2001 From: itowlson Date: Mon, 22 Nov 2021 09:22:00 +1300 Subject: [PATCH 3/7] Reinstate HTTP headers --- src/dispatcher.rs | 6 ++-- src/handlers.rs | 76 +++++++++++++++++++------------------------- src/stream_writer.rs | 62 ++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 47 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 96a53b2..f69eca1 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -58,7 +58,7 @@ impl RoutingTable { let request_context = RequestContext { client_addr, }; - let response = rte.handle_request(&parts, data, &request_context, &self.global_context); + let response = rte.handle_request(&parts, data, &request_context, &self.global_context).await; Ok(response) }, Err(_) => Ok(not_found()), @@ -149,7 +149,7 @@ impl RoutingTableEntry { // TODO: I don't think this rightly belongs here. But // reasonable place to at least understand the decomposition and // dependencies. - pub fn handle_request( + pub async fn handle_request( &self, req: &Parts, body: Vec, @@ -159,7 +159,7 @@ impl RoutingTableEntry { match &self.handler_info { RouteHandler::HealthCheck => Response::new(Body::from("OK")), RouteHandler::Wasm(w) => { - let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()); + let response = w.handle_request(&self.route_pattern, req, body, request_context, global_context, self.unique_key()).await; match response { Ok(res) => res, Err(e) => { diff --git a/src/handlers.rs b/src/handlers.rs index 1ae6cdf..26b11d3 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,5 +1,4 @@ use std::{collections::HashMap}; -use std::sync::{Arc, RwLock}; use wasi_cap_std_sync::Dir; use hyper::{ @@ -16,6 +15,7 @@ use crate::dispatcher::RoutePattern; use crate::http_util::{internal_error, parse_cgi_headers}; use crate::request::{RequestContext, RequestGlobalContext}; +use crate::stream_writer::StreamWriter; use crate::wasm_module::WasmModuleSource; use crate::wasm_runner::{prepare_stdio_streams_for_http, prepare_wasm_instance, run_prepared_wasm_instance, WasmLinkOptions}; @@ -36,7 +36,7 @@ pub struct WasmRouteHandler { } impl WasmRouteHandler { - pub fn handle_request( + pub async fn handle_request( &self, matched_route: &RoutePattern, req: &Parts, @@ -45,7 +45,25 @@ impl WasmRouteHandler { global_context: &RequestGlobalContext, logging_key: String, ) -> Result, anyhow::Error> { + + // These broken-out functions are slightly artificial but help solve some lifetime + // issues (where otherwise you get errors about things not being Send across an + // await). + let (stream_writer, instance, store) = + self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?; + self.spawn_wasm_instance(instance, store, stream_writer.clone()); + + let response = compose_response(stream_writer).await?; // TODO: handle errors + + // TODO: c'mon man + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + + Ok(response) + } + + fn set_up_runtime_environment(&self, matched_route: &RoutePattern, req: &Parts, request_body: Vec, request_context: &RequestContext, global_context: &RequestGlobalContext, logging_key: String) -> anyhow::Result<(crate::stream_writer::StreamWriter, Instance, Store)> { let startup_span = tracing::info_span!("module instantiation").entered(); + let headers = crate::http_util::build_headers( matched_route, req, @@ -57,33 +75,25 @@ impl WasmRouteHandler { ); let stream_writer = crate::stream_writer::StreamWriter::new(); - let redirects = prepare_stdio_streams_for_http(request_body, stream_writer.clone(), global_context, logging_key)?; - let ctx = self.build_wasi_context_for_request(req, headers, redirects.streams)?; - let (store, instance) = self.prepare_wasm_instance(global_context, ctx)?; - - // Drop manually to get instantiation time + drop(startup_span); + + Ok((stream_writer, instance, store)) + } + fn spawn_wasm_instance(&self, instance: Instance, store: Store, mut stream_writer: StreamWriter) { let entrypoint = self.entrypoint.clone(); let wasm_module_name = self.wasm_module_name.clone(); - let mut sw = stream_writer.clone(); + tokio::spawn(async move { match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) { - Ok(()) => sw.done().unwrap(), // TODO: <-- + Ok(()) => stream_writer.done().unwrap(), // TODO: <-- Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?! }; }); - - let response = Response::new(Body::wrap_stream(stream_writer.as_stream())); - // TODO: c'mon man - std::thread::sleep(std::time::Duration::from_millis(10)); - - // TODO: headers headers headers - - Ok(response) } fn build_wasi_context_for_request(&self, req: &Parts, headers: HashMap, redirects: crate::wasm_module::IOStreamRedirects) -> Result { @@ -126,34 +136,12 @@ impl WasmRouteHandler { } } -pub fn compose_response(stdout_mutex: Arc>>) -> Result, Error> { - // Okay, once we get here, all the information we need to send back in the response - // should be written to the STDOUT buffer. We fetch that, format it, and send - // it back. In the process, we might need to alter the status code of the result. - // - // This is a little janky, but basically we are looping through the output once, - // looking for the double-newline that distinguishes the headers from the body. - // The headers can then be parsed separately, while the body can be sent back - // to the client. - - let out = stdout_mutex.read().unwrap(); - let mut last = 0; - let mut scan_headers = true; - let mut buffer: Vec = Vec::new(); - let mut out_headers: Vec = Vec::new(); - out.iter().for_each(|i| { - if scan_headers && *i == 10 && last == 10 { - out_headers.append(&mut buffer); - buffer = Vec::new(); - scan_headers = false; - return; // Consume the linefeed - } - last = *i; - buffer.push(*i) - }); - let mut res = Response::new(Body::from(buffer)); +pub async fn compose_response(mut stream_writer: StreamWriter) -> anyhow::Result> { + let header_block = stream_writer.header_block().await?; + let mut res = Response::new(Body::wrap_stream(stream_writer.as_stream())); + let mut sufficient_response = false; - parse_cgi_headers(String::from_utf8(out_headers)?) + parse_cgi_headers(String::from_utf8(header_block)?) .iter() .for_each(|h| { use hyper::header::{CONTENT_TYPE, LOCATION}; diff --git a/src/stream_writer.rs b/src/stream_writer.rs index ca1a990..9634cd3 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -54,6 +54,24 @@ impl StreamWriter { } } + pub async fn header_block(&mut self) -> anyhow::Result> { + loop { + match self.pending.write().as_deref_mut() { + Ok(pending) => match split_at_two_newlines(&pending) { + None => (), + Some((header_block, rest)) => { + *pending = rest; + return Ok(header_block); + } + }, + Err(e) => { + return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e)); + }, + } + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + } + } + pub fn as_stream(mut self) -> impl futures_core::stream::Stream>> { stream! { loop { @@ -132,3 +150,47 @@ impl Write for StreamWriter { Ok(()) } } + +fn split_at_two_newlines(source: &[u8]) -> Option<(Vec, Vec)> { + let mut buffer = vec![]; + let mut last: u8 = 0; + for value in source { + if *value == 10 && last == 10 { + let rest_slice = &source[(buffer.len() + 1)..]; + let rest = Vec::from(rest_slice); + return Some((buffer, rest)); + } else { + buffer.push(*value); + last = *value; + } + } + None +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn splits_at_two_newlines_if_pair_only() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x0a, 0x43, 0x44]; + let result = split_at_two_newlines(&source).expect("did not split at all"); + assert_eq!(vec![0x41, 0x42, 0x0a], result.0); + assert_eq!(vec![0x43, 0x44], result.1); + } + + #[test] + fn doesnt_splits_at_two_newlines_if_no_pair() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x45, 0x46]; + let result = split_at_two_newlines(&source); + assert_eq!(None, result); + } + + #[test] + fn splits_at_two_newlines_empty_rest_if_at_end() { + let source: Vec = vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a, 0x0a]; + let result = split_at_two_newlines(&source).expect("did not split at all"); + assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0); + assert!(result.1.is_empty()); + } +} From 1c68fba2edc364135a2d5bb790a93d9395d789e4 Mon Sep 17 00:00:00 2001 From: itowlson Date: Mon, 22 Nov 2021 10:55:20 +1300 Subject: [PATCH 4/7] Well, arsebiscuits --- src/handlers.rs | 3 +- src/stream_writer.rs | 68 ++++++++++++++++++++++++++------------------ 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index 26b11d3..44ef42a 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -55,8 +55,7 @@ impl WasmRouteHandler { let response = compose_response(stream_writer).await?; // TODO: handle errors - // TODO: c'mon man - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + tokio::task::yield_now().await; Ok(response) } diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 9634cd3..7b6e23a 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -7,21 +7,22 @@ pub struct StreamWriter { pending: Arc>>, done: Arc>, // A way for the write side to signal new data to the stream side - write_index: Arc>, - write_index_sender: Arc>, - write_index_receiver: tokio::sync::watch::Receiver, + // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY + // write_index: Arc>, + // write_index_sender: Arc>, + // write_index_receiver: tokio::sync::watch::Receiver, } impl StreamWriter { pub fn new() -> Self { - let write_index = 0; - let (tx, rx) = tokio::sync::watch::channel(write_index); + // let write_index = 0; + // let (tx, rx) = tokio::sync::watch::channel(write_index); Self { pending: Arc::new(RwLock::new(vec![])), done: Arc::new(RwLock::new(false)), - write_index: Arc::new(RwLock::new(write_index)), - write_index_sender: Arc::new(tx), - write_index_receiver: rx, + // write_index: Arc::new(RwLock::new(write_index)), + // write_index_sender: Arc::new(tx), + // write_index_receiver: rx, } } @@ -34,11 +35,14 @@ impl StreamWriter { Err(e) => Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; - { - let mut write_index = self.write_index.write().unwrap(); - *write_index = *write_index + 1; - self.write_index_sender.send(*write_index).unwrap(); - } + // This was meant to wake up listener threads when there was new data but it ended up + // just stalling until input was complete. TODO: investigate so we can get rid of the + // duration-based polling. + // { + // let mut write_index = self.write_index.write().unwrap(); + // *write_index = *write_index + 1; + // self.write_index_sender.send(*write_index).unwrap(); + // } result } @@ -68,6 +72,9 @@ impl StreamWriter { return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e)); }, } + // See comments on the as_stream loop, though using the change signal + // blocked this *completely* until end of writing! (And everything else + // waits on this.) tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; } } @@ -82,21 +89,26 @@ impl StreamWriter { if self.is_done() { return; } else { - // Not sure this is the smoothest way to do it. The oldest way was: - // tokio::time::sleep(tokio::time::Duration::from_micros(20)).await; - // which is a hideous kludge but subjectively felt quicker (but the - // number say not, so what is truth anyway) - match self.write_index_receiver.changed().await { - Ok(_) => continue, - Err(e) => { - // If this ever happens (which it, cough, shouldn't), it means all senders have - // closed, which _should_ mean we are done. Log the error - // but don't return it to the stream: the response as streamed so far - // _should_ be okay! - tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); - return; - } - } + // Not sure how to do this better. I tried using a signal that data + // had changed (via tokio::sync::watch::channel()), but that effectively + // blocked - we got the first chunk quickly but then it stalled waiting + // for the change notification. Polling is awful (and this interval is + // probably too aggressive) but I don't know how to get signalling + // to work! + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + + // For the record: this is what I tried: + // match self.write_index_receiver.changed().await { + // Ok(_) => continue, + // Err(e) => { + // // If this ever happens (which it, cough, shouldn't), it means all senders have + // // closed, which _should_ mean we are done. Log the error + // // but don't return it to the stream: the response as streamed so far + // // _should_ be okay! + // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); + // return; + // } + // } } } else { yield Ok(v); From 484d071928c822ed802433ec249018ae4711ab43 Mon Sep 17 00:00:00 2001 From: itowlson Date: Mon, 22 Nov 2021 16:13:35 +1300 Subject: [PATCH 5/7] Added a test for streaming --- src/stream_writer.rs | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 7b6e23a..36f3e5e 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -33,7 +33,7 @@ impl StreamWriter { Ok(()) }, Err(e) => - Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) + Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; // This was meant to wake up listener threads when there was new data but it ended up // just stalling until input was complete. TODO: investigate so we can get rid of the @@ -181,6 +181,8 @@ fn split_at_two_newlines(source: &[u8]) -> Option<(Vec, Vec)> { #[cfg(test)] mod test { + use futures::StreamExt; + use super::*; #[test] @@ -205,4 +207,31 @@ mod test { assert_eq!(vec![0x41, 0x42, 0x0a, 0x43, 0x44, 0x0a], result.0); assert!(result.1.is_empty()); } + + #[tokio::test] + async fn streaming_splits_out_headers() { + let mut sw = StreamWriter::new(); + let mut sw2 = sw.clone(); + tokio::spawn(async move { + write!(sw2, "Header 1\n").unwrap(); + write!(sw2, "Header 2\n").unwrap(); + write!(sw2, "\n").unwrap(); + write!(sw2, "Body 1\n").unwrap(); + write!(sw2, "Body 2\n").unwrap(); + sw2.done().unwrap(); + }); + let header = sw.header_block().await.unwrap(); + let header_text = String::from_utf8(header).unwrap(); + assert!(header_text.contains("Header 1\n")); + assert!(header_text.contains("Header 2\n")); + + let mut stm = Box::pin(sw.as_stream()); + let mut body = vec![]; + while let Some(Ok(v)) = stm.next().await { + body.extend_from_slice(&v); + } + let body_text = String::from_utf8(body).unwrap(); + assert!(body_text.contains("Body 1\n")); + assert!(body_text.contains("Body 2\n")); + } } From ada681d71c78db52bffc02e893771aa4850ee996 Mon Sep 17 00:00:00 2001 From: itowlson Date: Tue, 23 Nov 2021 14:31:36 +1300 Subject: [PATCH 6/7] Wait for change works well if combined with tiny wait --- src/stream_writer.rs | 76 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 36f3e5e..d8fed8d 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -7,22 +7,21 @@ pub struct StreamWriter { pending: Arc>>, done: Arc>, // A way for the write side to signal new data to the stream side - // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY - // write_index: Arc>, - // write_index_sender: Arc>, - // write_index_receiver: tokio::sync::watch::Receiver, + write_index: Arc>, + write_index_sender: Arc>, + write_index_receiver: tokio::sync::watch::Receiver, } impl StreamWriter { pub fn new() -> Self { - // let write_index = 0; - // let (tx, rx) = tokio::sync::watch::channel(write_index); + let write_index = 0; + let (tx, rx) = tokio::sync::watch::channel(write_index); Self { pending: Arc::new(RwLock::new(vec![])), done: Arc::new(RwLock::new(false)), - // write_index: Arc::new(RwLock::new(write_index)), - // write_index_sender: Arc::new(tx), - // write_index_receiver: rx, + write_index: Arc::new(RwLock::new(write_index)), + write_index_sender: Arc::new(tx), + write_index_receiver: rx, } } @@ -35,14 +34,12 @@ impl StreamWriter { Err(e) => Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; - // This was meant to wake up listener threads when there was new data but it ended up - // just stalling until input was complete. TODO: investigate so we can get rid of the - // duration-based polling. - // { - // let mut write_index = self.write_index.write().unwrap(); - // *write_index = *write_index + 1; - // self.write_index_sender.send(*write_index).unwrap(); - // } + { + let mut write_index = self.write_index.write().unwrap(); + *write_index = *write_index + 1; + self.write_index_sender.send(*write_index).unwrap(); + drop(write_index); + } result } @@ -89,28 +86,33 @@ impl StreamWriter { if self.is_done() { return; } else { - // Not sure how to do this better. I tried using a signal that data - // had changed (via tokio::sync::watch::channel()), but that effectively - // blocked - we got the first chunk quickly but then it stalled waiting - // for the change notification. Polling is awful (and this interval is - // probably too aggressive) but I don't know how to get signalling - // to work! - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; - - // For the record: this is what I tried: - // match self.write_index_receiver.changed().await { - // Ok(_) => continue, - // Err(e) => { - // // If this ever happens (which it, cough, shouldn't), it means all senders have - // // closed, which _should_ mean we are done. Log the error - // // but don't return it to the stream: the response as streamed so far - // // _should_ be okay! - // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); - // return; - // } - // } + // This tiny wait seems to help the write-stream pipeline to flow more smmoothly. + // If we go straight to the 'changed().await' then the pipeline seems to stall after + // a few dozen writes, and everything else gets held up until the entire output + // has been written. There may be better ways of doing this; I haven't found them + // yet. + // + // (By the way, having the timer but not the change notification also worked. But if + // writes came slowly, that would result in very aggressive polling. So hopefully this + // gives us the best of both worlds.) + tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; + + match self.write_index_receiver.changed().await { + Ok(_) => continue, + Err(e) => { + // If this ever happens (which it, cough, shouldn't), it means all senders have + // closed, which _should_ mean we are done. Log the error + // but don't return it to the stream: the response as streamed so far + // _should_ be okay! + tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); + return; + } + } } } else { + // This tiny wait seems to help the write-stream pipeline to flow more smmoothly. + // See the comment on the 'empty buffer' case. + tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; yield Ok(v); } }, From 99620063bc25df329bc2028cb71081af344f358c Mon Sep 17 00:00:00 2001 From: itowlson Date: Tue, 23 Nov 2021 15:33:41 +1300 Subject: [PATCH 7/7] Make errors slightly less worse --- src/handlers.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/handlers.rs b/src/handlers.rs index 44ef42a..1f4eb02 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -53,7 +53,13 @@ impl WasmRouteHandler { self.set_up_runtime_environment(matched_route, req, request_body, request_context, global_context, logging_key)?; self.spawn_wasm_instance(instance, store, stream_writer.clone()); - let response = compose_response(stream_writer).await?; // TODO: handle errors + let response = match compose_response(stream_writer).await { + Ok(r) => r, + Err(e) => { + tracing::error!("Error parsing guest output into HTTP response: {}", e); + internal_error("internal error calling application") + } + }; tokio::task::yield_now().await; @@ -87,11 +93,18 @@ impl WasmRouteHandler { let entrypoint = self.entrypoint.clone(); let wasm_module_name = self.wasm_module_name.clone(); + // This is fire and forget, so there's a limited amount of error handling we + // can do. tokio::spawn(async move { match run_prepared_wasm_instance(instance, store, &entrypoint, &wasm_module_name) { - Ok(()) => stream_writer.done().unwrap(), // TODO: <-- - Err(e) => tracing::error!("oh no {}", e), // TODO: behaviour? message? MESSAGE, IVAN?! + Ok(()) => (), + Err(e) => tracing::error!("Error running Wasm module: {}", e), }; + // TODO: should we attempt to write an error response to the StreamWriter here? + match stream_writer.done() { + Ok(()) => (), + Err(e) => tracing::error!("Error marking Wasm output as done: {}", e), + } }); }