diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 36f3e5e..d8fed8d 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -7,22 +7,21 @@ pub struct StreamWriter { pending: Arc>>, done: Arc>, // A way for the write side to signal new data to the stream side - // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY - // write_index: Arc>, - // write_index_sender: Arc>, - // write_index_receiver: tokio::sync::watch::Receiver, + write_index: Arc>, + write_index_sender: Arc>, + write_index_receiver: tokio::sync::watch::Receiver, } impl StreamWriter { pub fn new() -> Self { - // let write_index = 0; - // let (tx, rx) = tokio::sync::watch::channel(write_index); + let write_index = 0; + let (tx, rx) = tokio::sync::watch::channel(write_index); Self { pending: Arc::new(RwLock::new(vec![])), done: Arc::new(RwLock::new(false)), - // write_index: Arc::new(RwLock::new(write_index)), - // write_index_sender: Arc::new(tx), - // write_index_receiver: rx, + write_index: Arc::new(RwLock::new(write_index)), + write_index_sender: Arc::new(tx), + write_index_receiver: rx, } } @@ -35,14 +34,12 @@ impl StreamWriter { Err(e) => Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e)) }; - // This was meant to wake up listener threads when there was new data but it ended up - // just stalling until input was complete. TODO: investigate so we can get rid of the - // duration-based polling. - // { - // let mut write_index = self.write_index.write().unwrap(); - // *write_index = *write_index + 1; - // self.write_index_sender.send(*write_index).unwrap(); - // } + { + let mut write_index = self.write_index.write().unwrap(); + *write_index = *write_index + 1; + self.write_index_sender.send(*write_index).unwrap(); + drop(write_index); + } result } @@ -89,28 +86,33 @@ impl StreamWriter { if self.is_done() { return; } else { - // Not sure how to do this better. I tried using a signal that data - // had changed (via tokio::sync::watch::channel()), but that effectively - // blocked - we got the first chunk quickly but then it stalled waiting - // for the change notification. Polling is awful (and this interval is - // probably too aggressive) but I don't know how to get signalling - // to work! - tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; - - // For the record: this is what I tried: - // match self.write_index_receiver.changed().await { - // Ok(_) => continue, - // Err(e) => { - // // If this ever happens (which it, cough, shouldn't), it means all senders have - // // closed, which _should_ mean we are done. Log the error - // // but don't return it to the stream: the response as streamed so far - // // _should_ be okay! - // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); - // return; - // } - // } + // This tiny wait seems to help the write-stream pipeline to flow more smmoothly. + // If we go straight to the 'changed().await' then the pipeline seems to stall after + // a few dozen writes, and everything else gets held up until the entire output + // has been written. There may be better ways of doing this; I haven't found them + // yet. + // + // (By the way, having the timer but not the change notification also worked. But if + // writes came slowly, that would result in very aggressive polling. So hopefully this + // gives us the best of both worlds.) + tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; + + match self.write_index_receiver.changed().await { + Ok(_) => continue, + Err(e) => { + // If this ever happens (which it, cough, shouldn't), it means all senders have + // closed, which _should_ mean we are done. Log the error + // but don't return it to the stream: the response as streamed so far + // _should_ be okay! + tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e); + return; + } + } } } else { + // This tiny wait seems to help the write-stream pipeline to flow more smmoothly. + // See the comment on the 'empty buffer' case. + tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await; yield Ok(v); } },