Skip to content

Commit

Permalink
Wait for change works well if combined with tiny wait
Browse files Browse the repository at this point in the history
  • Loading branch information
itowlson committed Nov 23, 2021
1 parent 709b602 commit 055a87d
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ pub struct StreamWriter {
pending: Arc<RwLock<Vec<u8>>>,
done: Arc<RwLock<bool>>,
// A way for the write side to signal new data to the stream side
// ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
// write_index: Arc<RwLock<i64>>,
// write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
// write_index_receiver: tokio::sync::watch::Receiver<i64>,
write_index: Arc<RwLock<i64>>,
write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
write_index_receiver: tokio::sync::watch::Receiver<i64>,
}

impl StreamWriter {
pub fn new() -> Self {
// let write_index = 0;
// let (tx, rx) = tokio::sync::watch::channel(write_index);
let write_index = 0;
let (tx, rx) = tokio::sync::watch::channel(write_index);
Self {
pending: Arc::new(RwLock::new(vec![])),
done: Arc::new(RwLock::new(false)),
// write_index: Arc::new(RwLock::new(write_index)),
// write_index_sender: Arc::new(tx),
// write_index_receiver: rx,
write_index: Arc::new(RwLock::new(write_index)),
write_index_sender: Arc::new(tx),
write_index_receiver: rx,
}
}

Expand All @@ -35,14 +34,12 @@ impl StreamWriter {
Err(e) =>
Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e))
};
// This was meant to wake up listener threads when there was new data but it ended up
// just stalling until input was complete. TODO: investigate so we can get rid of the
// duration-based polling.
// {
// let mut write_index = self.write_index.write().unwrap();
// *write_index = *write_index + 1;
// self.write_index_sender.send(*write_index).unwrap();
// }
{
let mut write_index = self.write_index.write().unwrap();
*write_index = *write_index + 1;
self.write_index_sender.send(*write_index).unwrap();
drop(write_index);
}
result
}

Expand Down Expand Up @@ -89,28 +86,33 @@ impl StreamWriter {
if self.is_done() {
return;
} else {
// Not sure how to do this better. I tried using a signal that data
// had changed (via tokio::sync::watch::channel()), but that effectively
// blocked - we got the first chunk quickly but then it stalled waiting
// for the change notification. Polling is awful (and this interval is
// probably too aggressive) but I don't know how to get signalling
// to work!
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;

// For the record: this is what I tried:
// match self.write_index_receiver.changed().await {
// Ok(_) => continue,
// Err(e) => {
// // If this ever happens (which it, cough, shouldn't), it means all senders have
// // closed, which _should_ mean we are done. Log the error
// // but don't return it to the stream: the response as streamed so far
// // _should_ be okay!
// tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
// return;
// }
// }
// This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
// If we go straight to the 'changed().await' then the pipeline seems to stall after
// a few dozen writes, and everything else gets held up until the entire output
// has been written. There may be better ways of doing this; I haven't found them
// yet.
//
// (By the way, having the timer but not the change notification also worked. But if
// writes came slowly, that would result in very aggressive polling. So hopefully this
// gives us the best of both worlds.)
tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await;

match self.write_index_receiver.changed().await {
Ok(_) => continue,
Err(e) => {
// If this ever happens (which it, cough, shouldn't), it means all senders have
// closed, which _should_ mean we are done. Log the error
// but don't return it to the stream: the response as streamed so far
// _should_ be okay!
tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
return;
}
}
}
} else {
// This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
// See the comment on the 'empty buffer' case.
tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await;
yield Ok(v);
}
},
Expand Down

0 comments on commit 055a87d

Please sign in to comment.