We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First off, thank you guys for this repo. I've learned a lot from it.
I noticed when implementing a variant of CStream myself I didn't need to have the item type as Option:
CStream
Option
impl KConsumer { pub fn new<T: AsRef<str>, V: AsRef<str>>( config: &Config, topic_name: V, consumer_group_id: T, ) -> Self { let consumer_config = config.build_consumer_config(consumer_group_id); let consumer: BaseConsumer<_> = consumer_config.create().expect("Consumer creation error"); consumer .subscribe(&[topic_name.as_ref()]) .expect("Can't subscribe to specified topic"); Self { consumer: Arc::new(consumer), } } pub fn stream(&self) -> KStream { let (sender, receiver) = crossbeam::channel::unbounded(); let consumer = self.consumer.clone(); Self::gen_stream(sender, receiver, consumer) } fn gen_stream( sender: Sender<Option<OwnedMessage>>, receiver: Receiver<Option<OwnedMessage>>, consumer: Arc<BaseConsumer>, ) -> KStream { let _handle = thread::Builder::new() .name("kstream-gen".into()) .spawn(move || { for m in consumer.iter() { let msg = match m { Ok(bm) => Some(bm.detach()), Err(e) => { tracing::error!("{}", e); None } }; let _ = sender.send(msg); } }); KStream { receiver } } } pin_project! { #[derive(Clone)] #[must_use = "streams do nothing unless polled"] pub struct KStream { #[pin] receiver: Receiver<Option<OwnedMessage>>, } } impl Stream for KStream { type Item = OwnedMessage; // this is no longer `Option<OwnedMessage>` fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = self.project(); if let Ok(inner) = this.receiver.recv() { Poll::Ready(inner) } else { Poll::Pending } } }
The stream no longer returns a doubly-nested Option. This allows for slightly more ergonomic iteration:
while let Some(msg) = stream.next().await { // msg is no longer an Option here }
I'm wondering if there was a reason you intended to return Options instead of just the message (note that the stream still returns an Option anyway)
The text was updated successfully, but these errors were encountered:
No branches or pull requests
First off, thank you guys for this repo. I've learned a lot from it.
I noticed when implementing a variant of
CStream
myself I didn't need to have the item type asOption
:The stream no longer returns a doubly-nested
Option
. This allows for slightly more ergonomic iteration:I'm wondering if there was a reason you intended to return
Option
s instead of just the message (note that the stream still returns an Option anyway)The text was updated successfully, but these errors were encountered: