-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful shutdown of a stream for a single subscription #1201
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look at the implementation yet, only docs and tests.
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need more time to digest this.
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
Hmm, should we instead of this: Consumer.runWithGracefulShutdown(Consumer.partitionedStreamWithControl(Subscription.topics("topic150"), Serde.string, Serde.string)) {
stream => ...
} offer this: Consumer.partitionedStreamWithGracefulShutdown(Subscription.topics("topic150"), Serde.string, Serde.string) {
(stream, _) => stream.flatMapPar(...)
} The second parameter would be the |
If I understand it correctly, the proposal allows for more use cases; with it you can also call |
Well, I mean compared to just the
|
If resume after |
Well, in both proposals you can call I don't think you want to do anything after stop, but it would give you more explicit control when to stop, instead of when the scope ends. We probably need to decide if we want to add pause/resume in the future. If we do, we should add the |
Hey :) Thanks for the great work! Here's some initial feedback: I'm not a big fan of the To me, functions/methods returning it should return a Tuple SubscriptionStreamControl[Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]] in favor of: (Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])], SubscriptionStreamControl) Made the change in a PR to show/study how, to me, it simplifies things: https://github.com/zio/zio-kafka/pull/1207/files |
zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
Outdated
Show resolved
Hide resolved
Didn't finish my review yet. I still have some parts of the code to explore/understand, but I have to go. I'll finish it later 🙂 |
Thanks for the feedback Jules. Agreed about the extra concept that would be unwanted. Check out my latest interface proposal where there is only a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reading the code...
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/SubscriptionStreamControl.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
I understand now that when graceful shutdown starts we're ending the subscribed streams. That should work nicely. Lets work out what will happen next to the runloop. The runloop would still be happily fetching records for that stream. When those are offered to the stream, We can do slightly better though. We're fetching and storing all these records in the queue for nothing, even potentially causing an OOM for systems that are tuned for the case where processing happens almost immediately. My proposal is to:
If you want, I can extend this PR with that proposal (or create a separate PR). |
@erikvanoosten If you have some time to implement those two things, by all means. |
@svroonland Done in commit 1218204. Now I am wondering, how can we test this? |
Change looks good. Totally forgot to implement this part. |
Was able to create a minimized reproducer of the issue: zio/zio#9288 |
The abovementioned issue has been fixed and will probably be in the next ZIO release. For compatibility with older versions let's keep the |
Great work! Shall we slap a comment on it? E.g. something like:
|
This reverts commit f899758.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So many little comments. I'll push this now so you have something to work with. Meanwhile I'll continue reviewing.
|
||
zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed. | ||
|
||
Use the `with*Stream` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can extend this a bit more before we merge this PR.
case RunloopCommand.EndStreamsBySubscription(subscription, cont) => | ||
ZIO.foreachDiscard( | ||
state.assignedStreams.filter(stream => Subscription.subscriptionMatches(subscription, stream.tp)) | ||
)(_.end) *> cont |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The formatter makes some weird choices here. Would this case
be more readable when written as a for comprehension?
@@ -203,17 +213,20 @@ private[consumer] final class Runloop private ( | |||
|
|||
private def handlePoll(state: State): Task[State] = { | |||
for { | |||
partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) | |||
pendingCommitCount <- committer.pendingCommitCount | |||
runningStreamsBeforePoll <- ZIO.filter(state.assignedStreams)(_.isRunning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isRunning
is based on whether the stream control ended or not. However, we can/could do better by not providing data for ended subscriptions instead. WDYT?
/** | ||
* Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue | ||
* to be processed and their offsets committed. | ||
* | ||
* @param stream | ||
* The stream of partitions / records for this subscription | ||
* @param stop | ||
* Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to | ||
* proceed (consumer remains subscribed) | ||
*/ | ||
final private[consumer] case class SubscriptionStreamControl[S <: ZStream[_, _, _]](stream: S, stop: UIO[Unit]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case class models a stream that can be stopped from outside (with a stop method). It is not tied to subscriptions or anything else from zio-kafka. Therefore, I propose we rename this to something like StoppableStream
, or even simpler: StreamControl
.
/** | |
* Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue | |
* to be processed and their offsets committed. | |
* | |
* @param stream | |
* The stream of partitions / records for this subscription | |
* @param stop | |
* Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to | |
* proceed (consumer remains subscribed) | |
*/ | |
final private[consumer] case class SubscriptionStreamControl[S <: ZStream[_, _, _]](stream: S, stop: UIO[Unit]) | |
/** | |
* Models a stream with a graceful shutdown. | |
* | |
* In a graceful shutdown, the stream stops pulling elements from its source, but completes processing of already pulled stream elements. | |
* | |
* @param stream | |
* A stream that supports graceful shutdown | |
* @param stop | |
* Initiate a graceful shutdown of the stream | |
*/ | |
final private[consumer] case class StreamControl[S <: ZStream[_, _, _]](stream: S, stop: UIO[Unit]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering why this it is not like this:
case class StreamControl[R, E, A]](stream: ZStream[R, E, A], stop: UIO[Unit])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an opportunity to clean up the code that is using this with methods like map
:
case class StreamControl[R, E, A]](stream: ZStream[R, E, A], stop: UIO[Unit]) {
def map[R1 <: R, E1 >: E, B](f: ZStream[R, E, A] => ZStream[R1, E1, B]): StreamControl[R1, E1, B] =
StreamControl(f(stream), stop)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like it
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown. | ||
* | ||
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by | ||
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight | ||
* messages. | ||
* | ||
* EXPERIMENTAL API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown. | |
* | |
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by | |
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight | |
* messages. | |
* | |
* EXPERIMENTAL API | |
* Like [[partitionedAssignmentStream]] but wraps the stream in an effect that allows graceful shutdown. | |
* | |
* When this effect is interrupted, the stream of assigned partitions ends, allowing the streams created by | |
* `withStream` to complete gracefully, thereby fully processing all buffered and/or in-flight | |
* stream elements. | |
* | |
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any zio-kafka version. |
override def plainStream[R, K, V]( | ||
subscription: Subscription, | ||
keyDeserializer: Deserializer[R, K], | ||
valueDeserializer: Deserializer[R, V], | ||
bufferSize: Int | ||
bufferSize: Int = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we extract this change to its own PR? This looks like something that is useful on its own.
Note to self: it may also affect documentation.
.timeout(shutdownTimeout) | ||
.someOrElseZIO( | ||
ZIO.logError( | ||
"Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my attempt at rewriting the message more towards the user's view. Not sure if this is entirely correct though.
"Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed." | |
"Timeout waiting for `withStream` to gracefully shut down. Not all in-flight records may have been processed." |
) | ||
) | ||
.tapErrorCause(cause => | ||
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, the problem is not that the join fails, but more likely the stream itself.
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) | |
ZIO.logErrorCause("Stream failed while awaiting its graceful shutdown", cause) |
// The fork and join is a workaround for https://github.com/zio/zio/issues/9288 for ZIO <= 2.1.12 | ||
.forkDaemon | ||
.flatMap(_.join) | ||
.tapErrorCause(cause => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: when we remove this workaround (I am actually in favor of that, we can document that we require zio 2.1.13+), do we then still need this second tapErrorCause
?
diagnostics.emit(Finalization.SubscriptionFinalized) | ||
} | ||
} yield stream | ||
} yield SubscriptionStreamControl( | ||
stream = stream.merge(ZStream.fromZIO(end.await).as(Take.end)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I read (line 72) this some alarm bells went of in my head.
PartitionStreamControl
registers which records (offsets) were pulled. This information is later used by the rebalance listener when rebalanceSafeCommit
is enabled.
When the 'end' gets merged into the stream provided by PartitionStreamControl
, it could be that the latter just pulled a chunk of records, but before the chunk was given downstream, the end is merged in. This causes downstream to never see those records, even though PartitionStreamControl
thinks those have been pulled. When this happens, the ending rebalance will wait for maxRebalanceDuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A solution could be to stop offering data from the runloop (see also https://github.com/zio/zio-kafka/pull/1201/files#r1986290475).
Implements functionality for gracefully stopping a stream for a single subscription: stop fetching records for the assigned topic-partitions but keep being subscribed so that offsets can still be committed. Intended to replace
stopConsumption
, which did not support multiple-subscription use cases.A new command
EndStreamsBySubscription
is introduced, which calls theend
method on thePartitionStreamControl
of streams matching a subscription. In the methodConsumer#runWithGracefulShutdown
we then wait for the user's stream to complete, before removing the subscription.This is experimental functionality, intended to replace
stopConsumption
at some point. Methods with this new functionality are offered besides existing methods to maintain compatibility.All the fiber and scope trickery proved to be very hard to get right (the lifetime of this PR is a testimony to that), and there may still be subtle issues here.This is now traced back to issue zio/zio#9288Implements some of #941.