Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does CStream need to have item type Option? #101

Open
nastynaz opened this issue May 2, 2024 · 0 comments
Open

Does CStream need to have item type Option? #101

nastynaz opened this issue May 2, 2024 · 0 comments

Comments

@nastynaz
Copy link

nastynaz commented May 2, 2024

First off, thank you guys for this repo. I've learned a lot from it.

I noticed when implementing a variant of CStream myself I didn't need to have the item type as Option:

impl KConsumer {
    pub fn new<T: AsRef<str>, V: AsRef<str>>(
        config: &Config,
        topic_name: V,
        consumer_group_id: T,
    ) -> Self {
        let consumer_config = config.build_consumer_config(consumer_group_id);
        let consumer: BaseConsumer<_> = consumer_config.create().expect("Consumer creation error");
        consumer
            .subscribe(&[topic_name.as_ref()])
            .expect("Can't subscribe to specified topic");

        Self {
            consumer: Arc::new(consumer),
        }
    }

    pub fn stream(&self) -> KStream {
        let (sender, receiver) = crossbeam::channel::unbounded();
        let consumer = self.consumer.clone();
        Self::gen_stream(sender, receiver, consumer)
    }

    fn gen_stream(
        sender: Sender<Option<OwnedMessage>>,
        receiver: Receiver<Option<OwnedMessage>>,
        consumer: Arc<BaseConsumer>,
    ) -> KStream {
        let _handle = thread::Builder::new()
            .name("kstream-gen".into())
            .spawn(move || {
                for m in consumer.iter() {
                    let msg = match m {
                        Ok(bm) => Some(bm.detach()),
                        Err(e) => {
                            tracing::error!("{}", e);
                            None
                        }
                    };

                    let _ = sender.send(msg);
                }
            });

        KStream { receiver }
    }
}

pin_project! {
    #[derive(Clone)]
    #[must_use = "streams do nothing unless polled"]
    pub struct KStream {
        #[pin]
        receiver: Receiver<Option<OwnedMessage>>,
    }
}

impl Stream for KStream {
    type Item = OwnedMessage; // this is no longer `Option<OwnedMessage>`

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        if let Ok(inner) = this.receiver.recv() {
            Poll::Ready(inner)
        } else {
            Poll::Pending
        }
    }
}

The stream no longer returns a doubly-nested Option. This allows for slightly more ergonomic iteration:

while let Some(msg) = stream.next().await {
// msg is no longer an Option here
}

I'm wondering if there was a reason you intended to return Options instead of just the message (note that the stream still returns an Option anyway)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant