Skip to content

Commit

Permalink
Tweak Kafka source
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jan 21, 2025
1 parent 8379cbe commit 7fa24c3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
1 change: 0 additions & 1 deletion src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl<T: Send + 'static> Receiver<T> {
}

#[inline]

pub async fn recv_async(&self) -> Result<T, RecvError> {
self.0.recv_async().await.map_err(RecvError::from)
}
Expand Down
2 changes: 0 additions & 2 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2161,7 +2161,6 @@ where
///
/// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
/// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
/// ## Example
/// ```
/// # use renoir::prelude::*;
Expand Down Expand Up @@ -2281,7 +2280,6 @@ where
///
/// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
/// ```
pub fn map_memo<O: Data + Sync, F>(
self,
f: F,
Expand Down
36 changes: 31 additions & 5 deletions src/operator/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum KafkaSourceInner {
Running {
rx: Receiver<OwnedMessage>,
cancel_token: Arc<AtomicBool>,
cooldown: bool,
}, // Terminated,
}

Expand Down Expand Up @@ -81,6 +82,7 @@ impl Operator for KafkaSource {
let (tx, rx) = flume::bounded(8);
let cancel_token = Arc::new(AtomicBool::new(false));
let cancel = cancel_token.clone();
tracing::debug!("started kafka source with topics {:?}", topics);
tokio::spawn(async move {
let mut stream = consumer.stream();
while let Some(msg) = stream.next().await {
Expand All @@ -101,19 +103,43 @@ impl Operator for KafkaSource {
.expect("kafka fail to commit");
}
});
self.inner = KafkaSourceInner::Running { rx, cancel_token };
self.inner = KafkaSourceInner::Running {
rx,
cancel_token,
cooldown: false,
};
}

fn next(&mut self) -> StreamElement<Self::Out> {
match &self.inner {
match &mut self.inner {
KafkaSourceInner::Init { .. } => {
unreachable!("KafkaSource executing before setup!")
}
// KafkaSourceInner::Terminated => return StreamElement::Terminate,
KafkaSourceInner::Running { rx, .. } => {
match rx.recv() {
KafkaSourceInner::Running { rx, cooldown, .. } => {
if *cooldown {
match rx.recv() {
Ok(msg) => {
*cooldown = false;
return StreamElement::Item(msg);
}
Err(flume::RecvError::Disconnected) => {
tracing::warn!("kafka background task disconnected.");
return StreamElement::Terminate;
}
}
}

match rx.recv_timeout(std::time::Duration::from_millis(100)) {
Ok(msg) => StreamElement::Item(msg),
Err(e) => panic!("kafka background task panicked: {e}"),
Err(flume::RecvTimeoutError::Timeout) => {
*cooldown = true;
StreamElement::FlushBatch
}
Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!("kafka background task disconnected.");
StreamElement::Terminate
}
// StreamElement::Terminate,
}
}
Expand Down

0 comments on commit 7fa24c3

Please sign in to comment.