-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful shutdown of a stream for a single subscription #1201
base: master
Are you sure you want to change the base?
Changes from all commits
7a9a6f8
c8196b3
583d112
f55c898
6f86958
9eb989b
9a32c74
98df970
29f8e44
1215c43
97d7e6f
336aa8d
361cfec
4ee9696
dfa0afa
15ec438
885d9c9
1408148
ed326a2
252cc3a
9954dda
add21e9
8ec18c0
c409368
2dbddfc
7838929
c9e48ab
5d334e0
60464b6
258168d
6e25cd4
8d19e16
f1edcab
f19417b
5365aa0
f977001
c54b3e9
33a6d82
a155267
15e041f
7c21f82
f5e42c5
9a31569
27f033e
108b285
eaae8af
c862686
ff4ea7f
bbbfe48
a75a78e
5fec195
699e6e8
aafd4ec
34f5110
95f9bef
f33dc34
3cb1eee
87eadd8
31cd086
a6d2afa
1835758
6c28b89
e649753
348b01e
c9f1597
4a9b6f6
3c0cfd1
90ca347
edb7005
e400012
e83bc89
efd3937
e29d63e
f19f90d
c71a08f
5c27da7
9901b16
a9925d5
dac1865
ddbd576
8e461ee
5848e9c
143f914
d5fc7bb
d6f485c
5ef97ef
bec0e25
c7b6879
5e2031e
4c02d08
e4f4e11
46f2142
43a0c8a
a346981
585ee42
f899758
07c0bf3
00d868d
4e822fa
af901eb
298c293
f9f31ed
d87acd1
efe709a
83c48fd
f1610f9
be05e25
09a6375
adf3965
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -74,6 +74,28 @@ trait Consumer { | |||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V] | ||||||||||||||||||||||||||||||
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||||||||||||||||||||||||||||||
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight | ||||||||||||||||||||||||||||||
* messages. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* EXPERIMENTAL API | ||||||||||||||||||||||||||||||
Comment on lines
+78
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
def withPartitionedAssignmentStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[ | ||||||||||||||||||||||||||||||
R, | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
Any | ||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Create a stream with messages on the subscribed topic-partitions by topic-partition | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
|
@@ -97,6 +119,28 @@ trait Consumer { | |||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V] | ||||||||||||||||||||||||||||||
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by | ||||||||||||||||||||||||||||||
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight | ||||||||||||||||||||||||||||||
* messages. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* EXPERIMENTAL API | ||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
def withPartitionedStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[ | ||||||||||||||||||||||||||||||
R, | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
Any | ||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Create a stream with all messages on the subscribed topic-partitions | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
|
@@ -115,13 +159,33 @@ trait Consumer { | |||||||||||||||||||||||||||||
* On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer | ||||||||||||||||||||||||||||||
* subscription is changed to exclude this subscription. | ||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
def plainStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
bufferSize: Int = 4 | ||||||||||||||||||||||||||||||
): ZStream[R, Throwable, CommittableRecord[K, V]] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by | ||||||||||||||||||||||||||||||
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight | ||||||||||||||||||||||||||||||
* messages. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* EXPERIMENTAL API | ||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
def withPlainStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
bufferSize: Int = 4, | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit | ||||||||||||||||||||||||||||||
* requests. | ||||||||||||||||||||||||||||||
|
@@ -392,7 +456,8 @@ private[consumer] final class ConsumerLive private[consumer] ( | |||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
ZStream.unwrapScoped { | ||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||
stream <- runloopAccess.subscribe(subscription) | ||||||||||||||||||||||||||||||
streamControl <- runloopAccess.subscribe(subscription) | ||||||||||||||||||||||||||||||
stream = streamControl.stream | ||||||||||||||||||||||||||||||
} yield stream | ||||||||||||||||||||||||||||||
.map(_.exit) | ||||||||||||||||||||||||||||||
.flattenExitOption | ||||||||||||||||||||||||||||||
|
@@ -410,24 +475,161 @@ private[consumer] final class ConsumerLive private[consumer] ( | |||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
private def partitionedAssignmentStreamWithControl[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V] | ||||||||||||||||||||||||||||||
): ZIO[Scope, Throwable, StreamControl[ | ||||||||||||||||||||||||||||||
R, | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] | ||||||||||||||||||||||||||||||
]] = { | ||||||||||||||||||||||||||||||
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) | ||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||
streamControl <- runloopAccess.subscribe(subscription) | ||||||||||||||||||||||||||||||
} yield streamControl.map( | ||||||||||||||||||||||||||||||
_.map(_.exit).flattenExitOption.map { | ||||||||||||||||||||||||||||||
_.collect { | ||||||||||||||||||||||||||||||
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => | ||||||||||||||||||||||||||||||
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = | ||||||||||||||||||||||||||||||
if (onlyByteArraySerdes) | ||||||||||||||||||||||||||||||
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] | ||||||||||||||||||||||||||||||
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
tp -> stream | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def withPartitionedAssignmentStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[ | ||||||||||||||||||||||||||||||
R, | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
Any | ||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] = runWithGracefulShutdown[R, Throwable, Chunk[ | ||||||||||||||||||||||||||||||
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) | ||||||||||||||||||||||||||||||
]]( | ||||||||||||||||||||||||||||||
partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer), | ||||||||||||||||||||||||||||||
shutdownTimeout | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def partitionedStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V] | ||||||||||||||||||||||||||||||
): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = | ||||||||||||||||||||||||||||||
): Stream[ | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
( | ||||||||||||||||||||||||||||||
TopicPartition, | ||||||||||||||||||||||||||||||
ZStream[R, Throwable, CommittableRecord[K, V]] | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
] = | ||||||||||||||||||||||||||||||
partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def withPartitionedStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[ | ||||||||||||||||||||||||||||||
R, | ||||||||||||||||||||||||||||||
Throwable, | ||||||||||||||||||||||||||||||
Any | ||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] = | ||||||||||||||||||||||||||||||
withPartitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { stream => | ||||||||||||||||||||||||||||||
withStream(stream.flattenChunks) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def plainStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
bufferSize: Int | ||||||||||||||||||||||||||||||
bufferSize: Int = 4 | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we extract this change to its own PR? This looks like something that is useful on its own. Note to self: it may also affect documentation. |
||||||||||||||||||||||||||||||
): ZStream[R, Throwable, CommittableRecord[K, V]] = | ||||||||||||||||||||||||||||||
partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( | ||||||||||||||||||||||||||||||
n = Int.MaxValue, | ||||||||||||||||||||||||||||||
bufferSize = bufferSize | ||||||||||||||||||||||||||||||
)(_._2) | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def withPlainStream[R, K, V]( | ||||||||||||||||||||||||||||||
subscription: Subscription, | ||||||||||||||||||||||||||||||
keyDeserializer: Deserializer[R, K], | ||||||||||||||||||||||||||||||
valueDeserializer: Deserializer[R, V], | ||||||||||||||||||||||||||||||
bufferSize: Int = 4, | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration = 15.seconds | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any] | ||||||||||||||||||||||||||||||
): ZIO[R, Throwable, Any] = | ||||||||||||||||||||||||||||||
withPartitionedStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { partitionedStream => | ||||||||||||||||||||||||||||||
withStream( | ||||||||||||||||||||||||||||||
partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2) | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||
* Takes a StreamControl for some stream and runs the given ZIO workflow on that stream such that, when interrupted, | ||||||||||||||||||||||||||||||
* stops fetching records and gracefully waits for the ZIO workflow to complete. | ||||||||||||||||||||||||||||||
* | ||||||||||||||||||||||||||||||
* @param streamControl | ||||||||||||||||||||||||||||||
* Result of `withPlainStream`, `withPartitionedStream` or `withPartitionedAssignmentStream` | ||||||||||||||||||||||||||||||
* @param shutdownTimeout | ||||||||||||||||||||||||||||||
* Timeout for the workflow to complete after initiating the graceful shutdown | ||||||||||||||||||||||||||||||
* @param withStream | ||||||||||||||||||||||||||||||
* Takes the stream as input and returns a ZIO workflow that processes the stream. As in most programs the given | ||||||||||||||||||||||||||||||
* workflow runs until an external interruption, the result value (Any type) is meaningless. `withStream` is | ||||||||||||||||||||||||||||||
* typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)` | ||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||
private def runWithGracefulShutdown[R, E, A]( | ||||||||||||||||||||||||||||||
streamControl: ZIO[Scope, E, StreamControl[R, E, A]], | ||||||||||||||||||||||||||||||
shutdownTimeout: Duration | ||||||||||||||||||||||||||||||
)( | ||||||||||||||||||||||||||||||
withStream: ZStream[R, E, A] => ZIO[R, E, Any] | ||||||||||||||||||||||||||||||
): ZIO[R, E, Any] = | ||||||||||||||||||||||||||||||
ZIO.scoped[R] { | ||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||
control <- streamControl | ||||||||||||||||||||||||||||||
fib <- | ||||||||||||||||||||||||||||||
withStream(control.stream) | ||||||||||||||||||||||||||||||
.onInterrupt( | ||||||||||||||||||||||||||||||
ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen") | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
.tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause)) | ||||||||||||||||||||||||||||||
.forkScoped | ||||||||||||||||||||||||||||||
result <- | ||||||||||||||||||||||||||||||
fib.join.onInterrupt( | ||||||||||||||||||||||||||||||
control.stop *> | ||||||||||||||||||||||||||||||
fib.join | ||||||||||||||||||||||||||||||
.timeout(shutdownTimeout) | ||||||||||||||||||||||||||||||
.someOrElseZIO( | ||||||||||||||||||||||||||||||
ZIO.logError( | ||||||||||||||||||||||||||||||
"Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed." | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is my attempt at rewriting the message more towards the user's view. Not sure if this is entirely correct though.
Suggested change
|
||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
.tapErrorCause(cause => | ||||||||||||||||||||||||||||||
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand this correctly, the problem is not that the join fails, but more likely the stream itself.
Suggested change
|
||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
// The fork and join is a workaround for https://github.com/zio/zio/issues/9288 for ZIO <= 2.1.12 | ||||||||||||||||||||||||||||||
.forkDaemon | ||||||||||||||||||||||||||||||
.flatMap(_.join) | ||||||||||||||||||||||||||||||
.tapErrorCause(cause => | ||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: when we remove this workaround (I am actually in favor of that, we can document that we require zio 2.1.13+), do we then still need this second |
||||||||||||||||||||||||||||||
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
.ignore | ||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||
} yield result | ||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
override def stopConsumption: UIO[Unit] = | ||||||||||||||||||||||||||||||
ZIO.logDebug("stopConsumption called") *> | ||||||||||||||||||||||||||||||
runloopAccess.stopConsumption | ||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package zio.kafka.consumer | ||
import zio.UIO | ||
import zio.stream.ZStream | ||
|
||
/** | ||
* Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue | ||
* to be processed and their offsets committed. | ||
* | ||
* @param stream | ||
* The stream of partitions / records for this subscription | ||
* @param stop | ||
* Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to | ||
* proceed (consumer remains subscribed) | ||
*/ | ||
final private[consumer] case class StreamControl[-R, +E, +A](stream: ZStream[R, E, A], stop: UIO[Unit]) { | ||
def map[R1 <: R, E1 >: E, B](f: ZStream[R, E, A] => ZStream[R1, E1, B]): StreamControl[R1, E1, B] = | ||
StreamControl(f(stream), stop) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can extend this a bit more before we merge this PR.