@@ -7,21 +7,22 @@ pub struct StreamWriter {
7
7
pending : Arc < RwLock < Vec < u8 > > > ,
8
8
done : Arc < RwLock < bool > > ,
9
9
// A way for the write side to signal new data to the stream side
10
- write_index : Arc < RwLock < i64 > > ,
11
- write_index_sender : Arc < tokio:: sync:: watch:: Sender < i64 > > ,
12
- write_index_receiver : tokio:: sync:: watch:: Receiver < i64 > ,
10
+ // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
11
+ // write_index: Arc<RwLock<i64>>,
12
+ // write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
13
+ // write_index_receiver: tokio::sync::watch::Receiver<i64>,
13
14
}
14
15
15
16
impl StreamWriter {
16
17
pub fn new ( ) -> Self {
17
- let write_index = 0 ;
18
- let ( tx, rx) = tokio:: sync:: watch:: channel ( write_index) ;
18
+ // let write_index = 0;
19
+ // let (tx, rx) = tokio::sync::watch::channel(write_index);
19
20
Self {
20
21
pending : Arc :: new ( RwLock :: new ( vec ! [ ] ) ) ,
21
22
done : Arc :: new ( RwLock :: new ( false ) ) ,
22
- write_index : Arc :: new ( RwLock :: new ( write_index) ) ,
23
- write_index_sender : Arc :: new ( tx) ,
24
- write_index_receiver : rx,
23
+ // write_index: Arc::new(RwLock::new(write_index)),
24
+ // write_index_sender: Arc::new(tx),
25
+ // write_index_receiver: rx,
25
26
}
26
27
}
27
28
@@ -34,11 +35,14 @@ impl StreamWriter {
34
35
Err ( e) =>
35
36
Err ( anyhow:: anyhow!( "Internal error: StreamWriter::append can't take lock: {}" , e) )
36
37
} ;
37
- {
38
- let mut write_index = self . write_index . write ( ) . unwrap ( ) ;
39
- * write_index = * write_index + 1 ;
40
- self . write_index_sender . send ( * write_index) . unwrap ( ) ;
41
- }
38
+ // This was meant to wake up listener threads when there was new data but it ended up
39
+ // just stalling until input was complete. TODO: investigate so we can get rid of the
40
+ // duration-based polling.
41
+ // {
42
+ // let mut write_index = self.write_index.write().unwrap();
43
+ // *write_index = *write_index + 1;
44
+ // self.write_index_sender.send(*write_index).unwrap();
45
+ // }
42
46
result
43
47
}
44
48
@@ -68,6 +72,9 @@ impl StreamWriter {
68
72
return Err ( anyhow:: anyhow!( "Internal error: StreamWriter::header_block can't take lock: {}" , e) ) ;
69
73
} ,
70
74
}
75
+ // See comments on the as_stream loop, though using the change signal
76
+ // blocked this *completely* until end of writing! (And everything else
77
+ // waits on this.)
71
78
tokio:: time:: sleep ( tokio:: time:: Duration :: from_micros ( 1 ) ) . await ;
72
79
}
73
80
}
@@ -82,21 +89,26 @@ impl StreamWriter {
82
89
if self . is_done( ) {
83
90
return ;
84
91
} else {
85
- // Not sure this is the smoothest way to do it. The oldest way was:
86
- // tokio::time::sleep(tokio::time::Duration::from_micros(20)).await;
87
- // which is a hideous kludge but subjectively felt quicker (but the
88
- // number say not, so what is truth anyway)
89
- match self . write_index_receiver. changed( ) . await {
90
- Ok ( _) => continue ,
91
- Err ( e) => {
92
- // If this ever happens (which it, cough, shouldn't), it means all senders have
93
- // closed, which _should_ mean we are done. Log the error
94
- // but don't return it to the stream: the response as streamed so far
95
- // _should_ be okay!
96
- tracing:: error!( "StreamWriter::as_stream: error receiving write updates: {}" , e) ;
97
- return ;
98
- }
99
- }
92
+ // Not sure how to do this better. I tried using a signal that data
93
+ // had changed (via tokio::sync::watch::channel()), but that effectively
94
+ // blocked - we got the first chunk quickly but then it stalled waiting
95
+ // for the change notification. Polling is awful (and this interval is
96
+ // probably too aggressive) but I don't know how to get signalling
97
+ // to work!
98
+ tokio:: time:: sleep( tokio:: time:: Duration :: from_micros( 1 ) ) . await ;
99
+
100
+ // For the record: this is what I tried:
101
+ // match self.write_index_receiver.changed().await {
102
+ // Ok(_) => continue,
103
+ // Err(e) => {
104
+ // // If this ever happens (which it, cough, shouldn't), it means all senders have
105
+ // // closed, which _should_ mean we are done. Log the error
106
+ // // but don't return it to the stream: the response as streamed so far
107
+ // // _should_ be okay!
108
+ // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
109
+ // return;
110
+ // }
111
+ // }
100
112
}
101
113
} else {
102
114
yield Ok ( v) ;
0 commit comments