@@ -135,38 +135,41 @@ where
135
135
E : From < irpc:: Error > ,
136
136
E : From < irpc:: channel:: RecvError > ,
137
137
{
138
- Gen :: new ( move |co| async move {
139
- let mut rx = match self . await {
140
- Ok ( rx) => rx,
141
- Err ( e) => {
142
- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
143
- return ;
144
- }
145
- } ;
146
- loop {
147
- match rx. recv ( ) . await {
148
- Ok ( Some ( item) ) => match item. into_result_opt ( ) {
149
- Some ( Ok ( i) ) => co. yield_ ( Ok ( i) ) . await ,
150
- Some ( Err ( e) ) => {
151
- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
152
- break ;
153
- }
154
- None => break ,
155
- } ,
156
- Ok ( None ) => {
157
- co. yield_ ( Err ( E :: from ( irpc:: channel:: RecvError :: Io ( io:: Error :: new (
158
- io:: ErrorKind :: UnexpectedEof ,
159
- "unexpected end of stream" ,
160
- ) ) ) ) )
161
- . await ;
162
- break ;
163
- }
164
- Err ( e) => {
165
- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
166
- break ;
167
- }
168
- }
138
+ enum State < S , T > {
139
+ Init ( S ) ,
140
+ Receiving ( mpsc:: Receiver < T > ) ,
141
+ Done ,
142
+ }
143
+ fn eof ( ) -> RecvError {
144
+ io:: Error :: new ( io:: ErrorKind :: UnexpectedEof , "unexpected end of stream" ) . into ( )
145
+ }
146
+ async fn process_recv < S , T , E > (
147
+ mut rx : mpsc:: Receiver < T > ,
148
+ ) -> Option < ( std:: result:: Result < T :: Item , E > , State < S , T > ) >
149
+ where
150
+ T : IrpcStreamItem ,
151
+ E : From < T :: Error > ,
152
+ E : From < irpc:: Error > ,
153
+ E : From < RecvError > ,
154
+ {
155
+ match rx. recv ( ) . await {
156
+ Ok ( Some ( item) ) => match item. into_result_opt ( ) ? {
157
+ Ok ( i) => Some ( ( Ok ( i) , State :: Receiving ( rx) ) ) ,
158
+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
159
+ } ,
160
+ Ok ( None ) => Some ( ( Err ( E :: from ( eof ( ) ) ) , State :: Done ) ) ,
161
+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
162
+ }
163
+ }
164
+ Box :: pin ( stream:: unfold ( State :: Init ( self ) , |state| async move {
165
+ match state {
166
+ State :: Init ( fut) => match fut. await {
167
+ Ok ( rx) => process_recv ( rx) . await ,
168
+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
169
+ } ,
170
+ State :: Receiving ( rx) => process_recv ( rx) . await ,
171
+ State :: Done => None ,
169
172
}
170
- } )
173
+ } ) )
171
174
}
172
175
}
0 commit comments