Skip to content

Commit

Permalink
Well, arsebiscuits
Browse files Browse the repository at this point in the history
  • Loading branch information
itowlson committed Nov 21, 2021
1 parent fd92a9e commit 69364fa
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
3 changes: 1 addition & 2 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ impl WasmRouteHandler {

let response = compose_response(stream_writer).await?; // TODO: handle errors

// TODO: c'mon man
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
tokio::task::yield_now().await;

Ok(response)
}
Expand Down
68 changes: 40 additions & 28 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@ pub struct StreamWriter {
pending: Arc<RwLock<Vec<u8>>>,
done: Arc<RwLock<bool>>,
// A way for the write side to signal new data to the stream side
write_index: Arc<RwLock<i64>>,
write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
write_index_receiver: tokio::sync::watch::Receiver<i64>,
// ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
// write_index: Arc<RwLock<i64>>,
// write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
// write_index_receiver: tokio::sync::watch::Receiver<i64>,
}

impl StreamWriter {
pub fn new() -> Self {
let write_index = 0;
let (tx, rx) = tokio::sync::watch::channel(write_index);
// let write_index = 0;
// let (tx, rx) = tokio::sync::watch::channel(write_index);
Self {
pending: Arc::new(RwLock::new(vec![])),
done: Arc::new(RwLock::new(false)),
write_index: Arc::new(RwLock::new(write_index)),
write_index_sender: Arc::new(tx),
write_index_receiver: rx,
// write_index: Arc::new(RwLock::new(write_index)),
// write_index_sender: Arc::new(tx),
// write_index_receiver: rx,
}
}

Expand All @@ -34,11 +35,14 @@ impl StreamWriter {
Err(e) =>
Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e))
};
{
let mut write_index = self.write_index.write().unwrap();
*write_index = *write_index + 1;
self.write_index_sender.send(*write_index).unwrap();
}
// This was meant to wake up listener threads when there was new data but it ended up
// just stalling until input was complete. TODO: investigate so we can get rid of the
// duration-based polling.
// {
// let mut write_index = self.write_index.write().unwrap();
// *write_index = *write_index + 1;
// self.write_index_sender.send(*write_index).unwrap();
// }
result
}

Expand Down Expand Up @@ -68,6 +72,9 @@ impl StreamWriter {
return Err(anyhow::anyhow!("Internal error: StreamWriter::header_block can't take lock: {}", e));
},
}
// See comments on the as_stream loop, though using the change signal
// blocked this *completely* until end of writing! (And everything else
// waits on this.)
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
}
}
Expand All @@ -82,21 +89,26 @@ impl StreamWriter {
if self.is_done() {
return;
} else {
// Not sure this is the smoothest way to do it. The oldest way was:
// tokio::time::sleep(tokio::time::Duration::from_micros(20)).await;
// which is a hideous kludge but subjectively felt quicker (but the
// number say not, so what is truth anyway)
match self.write_index_receiver.changed().await {
Ok(_) => continue,
Err(e) => {
// If this ever happens (which it, cough, shouldn't), it means all senders have
// closed, which _should_ mean we are done. Log the error
// but don't return it to the stream: the response as streamed so far
// _should_ be okay!
tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
return;
}
}
// Not sure how to do this better. I tried using a signal that data
// had changed (via tokio::sync::watch::channel()), but that effectively
// blocked - we got the first chunk quickly but then it stalled waiting
// for the change notification. Polling is awful (and this interval is
// probably too aggressive) but I don't know how to get signalling
// to work!
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;

// For the record: this is what I tried:
// match self.write_index_receiver.changed().await {
// Ok(_) => continue,
// Err(e) => {
// // If this ever happens (which it, cough, shouldn't), it means all senders have
// // closed, which _should_ mean we are done. Log the error
// // but don't return it to the stream: the response as streamed so far
// // _should_ be okay!
// tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
// return;
// }
// }
}
} else {
yield Ok(v);
Expand Down

0 comments on commit 69364fa

Please sign in to comment.