How to use mpsc::channel in single-send, probably single-read scenario? #6119
-
I use tonic+tokio to serve a grpc service. One of the rpc method is: Server read something (e.g. file) and streaming its appending data to client. service XXService {
rpc Tail(TailRequest) returns (stream TailResponse);
} My simple idea is that create a #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (sender, receiver) = mpsc::channel(1);
// read thread
read_buffer(sender);
// grpc server
let addr = "xxx".parse()?;
let xx_service = XXService::new(receiver);
Server::builder()
.add_service(XXServiceServer::new(xx_service))
.serve(addr)
.await?;
Ok(())
} I want to use the reference of the receiver, so that I can directly pass it to the rpc response. async fn tail(
&self,
request: Request<TailRequest>,
) -> Result<Response<Self::TailResponseStream>, Status> {
// error, of course
let reply_stream = ReceiverStream::new(self.receiver);
Ok(Response::new(
Box::pin(reply_stream) as Self::TailResponseStream
))
} I tried to use // stream
struct TailStream {
inner: Arc<Mutex<Receiver<Result<TailResponse, Status>>>>,
}
impl Stream for TailStream {
type Item = Result<TailResponse, Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.lock().unwrap().poll_recv(cx)
}
}
impl TailStream {
fn new(recv: Arc<Mutex<Receiver<Result<TailResponse, Status>>>>) -> Self {
Self { inner: recv }
}
}
// main
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let xx_service = XXService::new(Arc::new(Mutex::new(receiver)));
}
// rpc
async fn tail(
&self,
request: Request<TailRequest>,
) -> Result<Response<Self::TailResponseStream>, Status> {
let reply_stream = TailStream::new(self.receiver.clone());
Ok(Response::new(
Box::pin(reply_stream) as Self::TailResponseStream
))
} It works, but IMO the Some other methods I thought about but not satisfied:
I am new to rust and tokio, hope to get help from anyone 🙏 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
You definitely don't want an You haven't provided a complete example or an error message, so it's difficult to provide more complete advice than this. |
Beta Was this translation helpful? Give feedback.
You can do that like this:
This method will return the receiver on the first call, and
None
on any future calls.