Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Kafka consumer #151

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hie.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
cradle:
stack:
- path: "./library"
component: "freckle-app:lib"

- path: "./doctest"
component: "freckle-app:test:doctest"

- path: "./tests"
component: "freckle-app:test:spec"
48 changes: 31 additions & 17 deletions library/Freckle/App/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,39 @@ runConsumer
, HasCallStack
)
=> Timeout
-> Int
-> (a -> m ())
Copy link
Contributor Author

@z0isch z0isch Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right shape for the handler? Should it be [a] -> m () instead?

Seems like it'd more flexible for the client to do "smart" things if we hand them the entire batch. I'm thinking things like doing concurrent actions or grouping stuff together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I defer to you.

Practically speaking, I'd put my money on consumers not doing smart things, but it's also trivial for them to do their own traverse_.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think [a] -> m () also implies that we can't store or commit offsets on each event, so I guess for the first attempt at this I can go with the a -> m () shape and see what happens.

-> m ()
runConsumer pollTimeout onMessage =
runConsumer pollTimeout batchSize onMessage =
withTraceIdContext $ immortalCreate onFinish $ do
consumer <- view kafkaConsumerL

flip catches handlers $ inSpan "kafka.consumer" consumerSpanArguments $ do
mRecord <- fromKafkaError =<< pollMessage consumer kTimeout

for_ mRecord $ \r -> do
for_ (crValue r) $ \bs -> do
a <-
inSpan "kafka.consumer.message.decode" defaultSpanArguments $
either (throwM . KafkaMessageDecodeError bs) pure $
eitherDecodeStrict bs
inSpan "kafka.consumer.message.handle" defaultSpanArguments $ onMessage a

inSpan "kafka.consumer.message.commit" defaultSpanArguments $ do
-- Store the offset of this record, then do a best-offort commit to
-- the broker. If this fails and we crash or shutdown, the commit in
-- the onFinish handler will pick it up.
logExMay "Unable to store offset" $ storeOffsetMessage consumer r
void $ commitAllOffsets OffsetCommitAsync consumer
mRecords <-
traverse fromKafkaError
=<< pollMessageBatch consumer kTimeout kBatchSize

for_ mRecords $ \mRecord -> do
for_ mRecord $ \r -> do
for_ (crValue r) $ \bs -> do
a <-
inSpan "kafka.consumer.message.decode" defaultSpanArguments $
either (throwM . KafkaMessageDecodeError bs) pure $
eitherDecodeStrict bs
inSpan "kafka.consumer.message.handle" defaultSpanArguments $ onMessage a
logExMay
"Unable to store offset"
(silenceNoOffsetError <$> storeOffsetMessage consumer r)

inSpan "kafka.consumer.message.commit" defaultSpanArguments $
-- Do a best-effort commit to the broker. If this fails and
-- we crash or shutdown, the commit in the onFinish handler
-- will pick it up.
void $
commitAllOffsets OffsetCommitAsync consumer
where
kTimeout = Kafka.Timeout $ timeoutMs pollTimeout
kBatchSize = BatchSize batchSize

handlers =
[ ExceptionHandler $
Expand All @@ -213,6 +221,12 @@ runConsumer pollTimeout onMessage =
. annotatedExceptionMessageFrom (const msg)
. AnnotatedException []

-- It should not be considered an error if we have no offsets to commit
silenceNoOffsetError :: Maybe KafkaError -> Maybe KafkaError
silenceNoOffsetError = \case
Just (KafkaResponseError RdKafkaRespErrNoOffset) -> Nothing
e -> e

fromKafkaError
:: (MonadIO m, MonadLogger m, HasCallStack)
=> Either KafkaError a
Expand Down
Loading