Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of a stream for a single subscription #1201

Open
wants to merge 109 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
7a9a6f8
Draft of interface changes
svroonland Mar 24, 2024
c8196b3
Remove deprecated for now
svroonland Mar 24, 2024
583d112
Draft implementation
svroonland Mar 24, 2024
f55c898
Fix implementation + first test
svroonland Mar 24, 2024
6f86958
More tests copied from stopConsumption
svroonland Mar 27, 2024
9eb989b
Alternative interface, workaround inability to deconstruct tuples in …
svroonland Mar 28, 2024
9a32c74
Formatting
svroonland Mar 28, 2024
98df970
Fix doc
svroonland Mar 28, 2024
29f8e44
Add test
svroonland Mar 28, 2024
1215c43
Tweak docs
svroonland Mar 28, 2024
97d7e6f
Add test
svroonland Mar 28, 2024
336aa8d
Move to separate file
svroonland Mar 29, 2024
361cfec
runWithGracefulShutdown
svroonland Mar 30, 2024
4ee9696
Add timeout
svroonland Mar 30, 2024
dfa0afa
Process PR comments
svroonland Apr 1, 2024
15ec438
Fix type constraints
svroonland Apr 1, 2024
885d9c9
Only offer *streamWithGracefulShutdown methods
svroonland Apr 3, 2024
1408148
Pause a partition when its stream is ended
erikvanoosten Apr 13, 2024
ed326a2
More tests
svroonland Apr 14, 2024
252cc3a
Add default value for bufferSize consistently
svroonland Apr 14, 2024
9954dda
Fix race condition between join and timeout, leading to unwanted inte…
svroonland Apr 14, 2024
add21e9
Fix test
svroonland Apr 14, 2024
8ec18c0
Make SubscriptionStreamControl a case class
svroonland Apr 14, 2024
c409368
Update doc
svroonland Apr 14, 2024
2dbddfc
Cleanup
svroonland Apr 14, 2024
7838929
Simplify subscribe
svroonland Apr 15, 2024
c9e48ab
requireRunning false
svroonland Apr 15, 2024
5d334e0
Log unexpected interruption
svroonland Apr 20, 2024
60464b6
Log more
svroonland Apr 20, 2024
258168d
Use partitionedStream
svroonland Apr 21, 2024
6e25cd4
Update pendingRequests and assignedStreams
svroonland Apr 21, 2024
8d19e16
Formatting
erikvanoosten Apr 27, 2024
f1edcab
Fix linting
erikvanoosten Apr 27, 2024
f19417b
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland May 10, 2024
5365aa0
Merge branch 'master' into subscription-stream-control
svroonland May 11, 2024
f977001
Merge branch 'master' into subscription-stream-control
svroonland May 19, 2024
c54b3e9
This works
svroonland May 20, 2024
33a6d82
This works with timeout
svroonland May 20, 2024
a155267
Remove unused annotation
svroonland May 20, 2024
15e041f
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 5, 2024
7c21f82
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 16, 2024
f5e42c5
Merge branch 'master' into subscription-stream-control
svroonland Jul 14, 2024
9a31569
Small improvements to the Producer (#1272)
erikvanoosten Jul 14, 2024
27f033e
Document metrics and consumer tuning based on metrics (#1280)
erikvanoosten Jul 14, 2024
108b285
Add alternative fetch strategy for many partitions (#1281)
erikvanoosten Jul 16, 2024
eaae8af
Alternative producer implementation (#1285)
erikvanoosten Jul 18, 2024
c862686
Prevent users from enabling auto commit (#1290)
erikvanoosten Jul 24, 2024
ff4ea7f
Update scalafmt-core to 3.8.3 (#1291)
zio-scala-steward[bot] Jul 26, 2024
bbbfe48
Upgrade to 2.1.7+11-854102ae-SNAPSHOT with ZStream finalization fix
svroonland Aug 10, 2024
a75a78e
Add sonatype snapshots
svroonland Aug 10, 2024
5fec195
Bump ZIO version
svroonland Oct 10, 2024
699e6e8
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 10, 2024
aafd4ec
Revert stuff
svroonland Oct 10, 2024
34f5110
Bump
svroonland Oct 20, 2024
95f9bef
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 20, 2024
f33dc34
Fix race condition when removing subscription
svroonland Nov 2, 2024
3cb1eee
Tweak
svroonland Nov 2, 2024
87eadd8
Tweak
svroonland Nov 2, 2024
31cd086
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
a6d2afa
Increase timeout
svroonland Nov 2, 2024
1835758
This seems to work
svroonland Nov 2, 2024
6c28b89
Cleanup
svroonland Nov 2, 2024
e649753
Cleanup
svroonland Nov 2, 2024
348b01e
Restore old methods so we can offer this as experimental functionality
svroonland Nov 2, 2024
c9f1597
Rename methods + add some doc
svroonland Nov 2, 2024
4a9b6f6
More renames
svroonland Nov 2, 2024
3c0cfd1
Fix rebalanceSafeCommits test timing out
svroonland Nov 2, 2024
90ca347
Forgot to commit this file
svroonland Nov 2, 2024
edb7005
Fix shutdown behavior
svroonland Nov 2, 2024
e400012
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
e83bc89
Apply suggestions from code review
svroonland Nov 2, 2024
efd3937
Restore stuff
svroonland Nov 2, 2024
e29d63e
Cleanup
svroonland Nov 2, 2024
f19f90d
Update docs/consuming-kafka-topics-using-zio-streams.md
svroonland Nov 3, 2024
c71a08f
PR comments
svroonland Nov 3, 2024
5c27da7
Do not empty commits when stopping all streams
svroonland Nov 5, 2024
9901b16
Revert change
svroonland Nov 5, 2024
a9925d5
Add assertion before poll, add some documenting comments
svroonland Nov 9, 2024
dac1865
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 9, 2024
ddbd576
Stronger test
svroonland Nov 9, 2024
8e461ee
Do not clear assignedStreams when ending streams by subscription (ins…
svroonland Nov 9, 2024
5848e9c
Fix interruption issue
svroonland Nov 9, 2024
143f914
Log timeout error + cleanup
svroonland Nov 9, 2024
d5fc7bb
Fix test compilation withFilter issue
svroonland Nov 9, 2024
d6f485c
Fix doc syntax
svroonland Nov 9, 2024
5ef97ef
Fix test
svroonland Nov 9, 2024
bec0e25
Update comment
svroonland Nov 10, 2024
c7b6879
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 16, 2024
5e2031e
Comment workaround
svroonland Nov 16, 2024
4c02d08
Merge branch 'master' into subscription-stream-control
svroonland Nov 16, 2024
e4f4e11
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 16, 2025
46f2142
Tweak
svroonland Feb 16, 2025
43a0c8a
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 22, 2025
a346981
Fix merge error + test
svroonland Feb 22, 2025
585ee42
Add experimental notes
svroonland Feb 22, 2025
f899758
Allow returning a value from with*Stream methods
svroonland Feb 22, 2025
07c0bf3
More logging
svroonland Mar 1, 2025
00d868d
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 1, 2025
4e822fa
Make debugging easier
svroonland Mar 1, 2025
af901eb
No infinite retry
svroonland Mar 1, 2025
298c293
Restore log level
svroonland Mar 1, 2025
f9f31ed
Use KafkaTestUtils for creating topic
svroonland Mar 1, 2025
d87acd1
Unused imports
svroonland Mar 1, 2025
efe709a
Remove usage of stopConsumption
svroonland Mar 1, 2025
83c48fd
Rearrange all stopConsumption tests in one suite
svroonland Mar 1, 2025
f1610f9
Revert "Allow returning a value from with*Stream methods"
svroonland Mar 1, 2025
be05e25
Small refactoring
svroonland Mar 1, 2025
09a6375
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 9, 2025
adf3965
Alternative type parameters for StreamControl
svroonland Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/consuming-kafka-topics-using-zio-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,33 @@ Each time a new partition is assigned to the consumer, a new partition stream is
ensures only a single stream is doing commits.

In this example, each partition is processed in parallel, on separate fibers, while a single fiber is doing commits.

## Controlled shutdown (experimental)

The examples above will keep processing records forever, or until the fiber is interrupted, typically at application shutdown. When interrupted, some records may be 'in-flight', e.g. being processed by one of the stages of your consumer stream user code. Those records will be processed partly and their offsets may not be committed. For fast shutdown in an at-least-once processing scenario this is fine.

zio-kafka also supports a _graceful shutdown_, where the fetching of records for the subscribed topics/partitions is stopped, the streams are ended and all downstream stages are completed, allowing in-flight records to be fully processed.

Use the `with*Stream` variants of `plainStream`, `partitionedStream` and `partitionedAssignmentStream` for this purpose. These methods accept a parameter that describes the execution of a stream, which is gracefully ended when the method is interrupted.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extend this a bit more before we merge this PR.


As of zio-kafka 3.0, this functionality is experimental. If no issues are reported and the API has good usability, it will eventually be marked as stable.

```scala
import zio.Console.printLine
import zio.kafka.consumer._

consumer.withPartitionedStream(
Subscription.topics("topic150"),
Serde.string,
Serde.string
) { stream =>
stream.flatMapPar(Int.MaxValue) { case (topicPartition, partitionStream) =>
partitionStream
.tap(record => printLine(s"key: ${record.key}, value: ${record.value}"))
.map(_.offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
```
568 changes: 433 additions & 135 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala

Large diffs are not rendered by default.

208 changes: 205 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ trait Consumer {
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]]

/**
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown.
*
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does all partition streams are closed upstream mean?

* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight
* messages.
*
* EXPERIMENTAL API
Comment on lines +78 to +84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Like [[partitionedAssignmentStream]] but wraps the stream in a construct that ensures graceful shutdown.
*
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight
* messages.
*
* EXPERIMENTAL API
* Like [[partitionedAssignmentStream]] but wraps the stream in an effect that allows graceful shutdown.
*
* When this effect is interrupted, the stream of assigned partitions ends, allowing the streams created by
* `withStream` to complete gracefully, thereby fully processing all buffered and/or in-flight
* stream elements.
*
* WARNING: this is an EXPERIMENTAL API and may disappear or change in an incompatible way without notice in any zio-kafka version.

*/
def withPartitionedAssignmentStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[
R,
Throwable,
Any
]
): ZIO[R, Throwable, Any]

/**
* Create a stream with messages on the subscribed topic-partitions by topic-partition
*
Expand All @@ -97,6 +119,28 @@ trait Consumer {
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]

/**
* Like [[partitionedStream]] but wraps the stream in a construct that ensures graceful shutdown.
*
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight
* messages.
*
* EXPERIMENTAL API
*/
def withPartitionedStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[
R,
Throwable,
Any
]
): ZIO[R, Throwable, Any]

/**
* Create a stream with all messages on the subscribed topic-partitions
*
Expand All @@ -115,13 +159,33 @@ trait Consumer {
* On completion of the stream, the consumer is unsubscribed. In case of multiple subscriptions, the total consumer
* subscription is changed to exclude this subscription.
*/

def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4
): ZStream[R, Throwable, CommittableRecord[K, V]]

/**
* Like [[plainStream]] but wraps the stream in a construct that ensures graceful shutdown
*
* When this effect is interrupted, all partition streams are closed upstream, allowing the stream created by
* `withStream` to complete gracefully all stream stages, thereby fully processing all buffered and/or in-flight
* messages.
*
* EXPERIMENTAL API
*/
def withPlainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4,
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any]
): ZIO[R, Throwable, Any]

/**
* Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit
* requests.
Expand Down Expand Up @@ -392,7 +456,8 @@ private[consumer] final class ConsumerLive private[consumer] (

ZStream.unwrapScoped {
for {
stream <- runloopAccess.subscribe(subscription)
streamControl <- runloopAccess.subscribe(subscription)
stream = streamControl.stream
} yield stream
.map(_.exit)
.flattenExitOption
Expand All @@ -410,24 +475,161 @@ private[consumer] final class ConsumerLive private[consumer] (
}
}

private def partitionedAssignmentStreamWithControl[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZIO[Scope, Throwable, StreamControl[
R,
Throwable,
Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]
]] = {
val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray)
for {
streamControl <- runloopAccess.subscribe(subscription)
} yield streamControl.map(
_.map(_.exit).flattenExitOption.map {
_.collect {
case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) =>
val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
if (onlyByteArraySerdes)
partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

tp -> stream
}
}
)
}

override def withPartitionedAssignmentStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] => ZIO[
R,
Throwable,
Any
]
): ZIO[R, Throwable, Any] = runWithGracefulShutdown[R, Throwable, Chunk[
(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])
]](
partitionedAssignmentStreamWithControl(subscription, keyDeserializer, valueDeserializer),
shutdownTimeout
)(
withStream
)

override def partitionedStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] =
): Stream[
Throwable,
(
TopicPartition,
ZStream[R, Throwable, CommittableRecord[K, V]]
)
] =
partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks

override def withPartitionedStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] => ZIO[
R,
Throwable,
Any
]
): ZIO[R, Throwable, Any] =
withPartitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { stream =>
withStream(stream.flattenChunks)
}

override def plainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int
bufferSize: Int = 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extract this change to its own PR? This looks like something that is useful on its own.

Note to self: it may also affect documentation.

): ZStream[R, Throwable, CommittableRecord[K, V]] =
partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar(
n = Int.MaxValue,
bufferSize = bufferSize
)(_._2)

override def withPlainStream[R, K, V](
subscription: Subscription,
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
bufferSize: Int = 4,
shutdownTimeout: Duration = 15.seconds
)(
withStream: ZStream[R, Throwable, CommittableRecord[K, V]] => ZIO[R, Throwable, Any]
): ZIO[R, Throwable, Any] =
withPartitionedStream(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { partitionedStream =>
withStream(
partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2)
)
}

/**
* Takes a StreamControl for some stream and runs the given ZIO workflow on that stream such that, when interrupted,
* stops fetching records and gracefully waits for the ZIO workflow to complete.
*
* @param streamControl
* Result of `withPlainStream`, `withPartitionedStream` or `withPartitionedAssignmentStream`
* @param shutdownTimeout
* Timeout for the workflow to complete after initiating the graceful shutdown
* @param withStream
* Takes the stream as input and returns a ZIO workflow that processes the stream. As in most programs the given
* workflow runs until an external interruption, the result value (Any type) is meaningless. `withStream` is
* typically something like `stream => stream.mapZIO(record => ZIO.debug(record)).mapZIO(_.offset.commit)`
*/
private def runWithGracefulShutdown[R, E, A](
streamControl: ZIO[Scope, E, StreamControl[R, E, A]],
shutdownTimeout: Duration
)(
withStream: ZStream[R, E, A] => ZIO[R, E, Any]
): ZIO[R, E, Any] =
ZIO.scoped[R] {
for {
control <- streamControl
fib <-
withStream(control.stream)
.onInterrupt(
ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen")
)
.tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause))
.forkScoped
result <-
fib.join.onInterrupt(
control.stop *>
fib.join
.timeout(shutdownTimeout)
.someOrElseZIO(
ZIO.logError(
"Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my attempt at rewriting the message more towards the user's view. Not sure if this is entirely correct though.

Suggested change
"Timeout joining withStream fiber in runWithGracefulShutdown. Not all pending commits may have been processed."
"Timeout waiting for `withStream` to gracefully shut down. Not all in-flight records may have been processed."

)
)
.tapErrorCause(cause =>
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, the problem is not that the join fails, but more likely the stream itself.

Suggested change
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
ZIO.logErrorCause("Stream failed while awaiting its graceful shutdown", cause)

)
// The fork and join is a workaround for https://github.com/zio/zio/issues/9288 for ZIO <= 2.1.12
.forkDaemon
.flatMap(_.join)
.tapErrorCause(cause =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: when we remove this workaround (I am actually in favor of that, we can document that we require zio 2.1.13+), do we then still need this second tapErrorCause?

ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
)
.ignore
)
} yield result
}

override def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
runloopAccess.stopConsumption
Expand Down
18 changes: 18 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/StreamControl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package zio.kafka.consumer
import zio.UIO
import zio.stream.ZStream

/**
* Allows graceful shutdown of a stream, where no more records are being fetched but the in-flight records can continue
* to be processed and their offsets committed.
*
* @param stream
* The stream of partitions / records for this subscription
* @param stop
* Stop fetching records for the subscribed topic-partitions and end the associated streams, while allowing commits to
* proceed (consumer remains subscribed)
*/
final private[consumer] case class StreamControl[-R, +E, +A](stream: ZStream[R, E, A], stop: UIO[Unit]) {
def map[R1 <: R, E1 >: E, B](f: ZStream[R, E, A] => ZStream[R1, E1, B]): StreamControl[R1, E1, B] =
StreamControl(f(stream), stop)
}
Loading