Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of a stream for a single subscription #1201

Open
wants to merge 109 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
7a9a6f8
Draft of interface changes
svroonland Mar 24, 2024
c8196b3
Remove deprecated for now
svroonland Mar 24, 2024
583d112
Draft implementation
svroonland Mar 24, 2024
f55c898
Fix implementation + first test
svroonland Mar 24, 2024
6f86958
More tests copied from stopConsumption
svroonland Mar 27, 2024
9eb989b
Alternative interface, workaround inability to deconstruct tuples in …
svroonland Mar 28, 2024
9a32c74
Formatting
svroonland Mar 28, 2024
98df970
Fix doc
svroonland Mar 28, 2024
29f8e44
Add test
svroonland Mar 28, 2024
1215c43
Tweak docs
svroonland Mar 28, 2024
97d7e6f
Add test
svroonland Mar 28, 2024
336aa8d
Move to separate file
svroonland Mar 29, 2024
361cfec
runWithGracefulShutdown
svroonland Mar 30, 2024
4ee9696
Add timeout
svroonland Mar 30, 2024
dfa0afa
Process PR comments
svroonland Apr 1, 2024
15ec438
Fix type constraints
svroonland Apr 1, 2024
885d9c9
Only offer *streamWithGracefulShutdown methods
svroonland Apr 3, 2024
1408148
Pause a partition when its stream is ended
erikvanoosten Apr 13, 2024
ed326a2
More tests
svroonland Apr 14, 2024
252cc3a
Add default value for bufferSize consistently
svroonland Apr 14, 2024
9954dda
Fix race condition between join and timeout, leading to unwanted inte…
svroonland Apr 14, 2024
add21e9
Fix test
svroonland Apr 14, 2024
8ec18c0
Make SubscriptionStreamControl a case class
svroonland Apr 14, 2024
c409368
Update doc
svroonland Apr 14, 2024
2dbddfc
Cleanup
svroonland Apr 14, 2024
7838929
Simplify subscribe
svroonland Apr 15, 2024
c9e48ab
requireRunning false
svroonland Apr 15, 2024
5d334e0
Log unexpected interruption
svroonland Apr 20, 2024
60464b6
Log more
svroonland Apr 20, 2024
258168d
Use partitionedStream
svroonland Apr 21, 2024
6e25cd4
Update pendingRequests and assignedStreams
svroonland Apr 21, 2024
8d19e16
Formatting
erikvanoosten Apr 27, 2024
f1edcab
Fix linting
erikvanoosten Apr 27, 2024
f19417b
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland May 10, 2024
5365aa0
Merge branch 'master' into subscription-stream-control
svroonland May 11, 2024
f977001
Merge branch 'master' into subscription-stream-control
svroonland May 19, 2024
c54b3e9
This works
svroonland May 20, 2024
33a6d82
This works with timeout
svroonland May 20, 2024
a155267
Remove unused annotation
svroonland May 20, 2024
15e041f
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 5, 2024
7c21f82
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Jun 16, 2024
f5e42c5
Merge branch 'master' into subscription-stream-control
svroonland Jul 14, 2024
9a31569
Small improvements to the Producer (#1272)
erikvanoosten Jul 14, 2024
27f033e
Document metrics and consumer tuning based on metrics (#1280)
erikvanoosten Jul 14, 2024
108b285
Add alternative fetch strategy for many partitions (#1281)
erikvanoosten Jul 16, 2024
eaae8af
Alternative producer implementation (#1285)
erikvanoosten Jul 18, 2024
c862686
Prevent users from enabling auto commit (#1290)
erikvanoosten Jul 24, 2024
ff4ea7f
Update scalafmt-core to 3.8.3 (#1291)
zio-scala-steward[bot] Jul 26, 2024
bbbfe48
Upgrade to 2.1.7+11-854102ae-SNAPSHOT with ZStream finalization fix
svroonland Aug 10, 2024
a75a78e
Add sonatype snapshots
svroonland Aug 10, 2024
5fec195
Bump ZIO version
svroonland Oct 10, 2024
699e6e8
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 10, 2024
aafd4ec
Revert stuff
svroonland Oct 10, 2024
34f5110
Bump
svroonland Oct 20, 2024
95f9bef
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Oct 20, 2024
f33dc34
Fix race condition when removing subscription
svroonland Nov 2, 2024
3cb1eee
Tweak
svroonland Nov 2, 2024
87eadd8
Tweak
svroonland Nov 2, 2024
31cd086
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
a6d2afa
Increase timeout
svroonland Nov 2, 2024
1835758
This seems to work
svroonland Nov 2, 2024
6c28b89
Cleanup
svroonland Nov 2, 2024
e649753
Cleanup
svroonland Nov 2, 2024
348b01e
Restore old methods so we can offer this as experimental functionality
svroonland Nov 2, 2024
c9f1597
Rename methods + add some doc
svroonland Nov 2, 2024
4a9b6f6
More renames
svroonland Nov 2, 2024
3c0cfd1
Fix rebalanceSafeCommits test timing out
svroonland Nov 2, 2024
90ca347
Forgot to commit this file
svroonland Nov 2, 2024
edb7005
Fix shutdown behavior
svroonland Nov 2, 2024
e400012
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 2, 2024
e83bc89
Apply suggestions from code review
svroonland Nov 2, 2024
efd3937
Restore stuff
svroonland Nov 2, 2024
e29d63e
Cleanup
svroonland Nov 2, 2024
f19f90d
Update docs/consuming-kafka-topics-using-zio-streams.md
svroonland Nov 3, 2024
c71a08f
PR comments
svroonland Nov 3, 2024
5c27da7
Do not empty commits when stopping all streams
svroonland Nov 5, 2024
9901b16
Revert change
svroonland Nov 5, 2024
a9925d5
Add assertion before poll, add some documenting comments
svroonland Nov 9, 2024
dac1865
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 9, 2024
ddbd576
Stronger test
svroonland Nov 9, 2024
8e461ee
Do not clear assignedStreams when ending streams by subscription (ins…
svroonland Nov 9, 2024
5848e9c
Fix interruption issue
svroonland Nov 9, 2024
143f914
Log timeout error + cleanup
svroonland Nov 9, 2024
d5fc7bb
Fix test compilation withFilter issue
svroonland Nov 9, 2024
d6f485c
Fix doc syntax
svroonland Nov 9, 2024
5ef97ef
Fix test
svroonland Nov 9, 2024
bec0e25
Update comment
svroonland Nov 10, 2024
c7b6879
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Nov 16, 2024
5e2031e
Comment workaround
svroonland Nov 16, 2024
4c02d08
Merge branch 'master' into subscription-stream-control
svroonland Nov 16, 2024
e4f4e11
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 16, 2025
46f2142
Tweak
svroonland Feb 16, 2025
43a0c8a
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Feb 22, 2025
a346981
Fix merge error + test
svroonland Feb 22, 2025
585ee42
Add experimental notes
svroonland Feb 22, 2025
f899758
Allow returning a value from with*Stream methods
svroonland Feb 22, 2025
07c0bf3
More logging
svroonland Mar 1, 2025
00d868d
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 1, 2025
4e822fa
Make debugging easier
svroonland Mar 1, 2025
af901eb
No infinite retry
svroonland Mar 1, 2025
298c293
Restore log level
svroonland Mar 1, 2025
f9f31ed
Use KafkaTestUtils for creating topic
svroonland Mar 1, 2025
d87acd1
Unused imports
svroonland Mar 1, 2025
efe709a
Remove usage of stopConsumption
svroonland Mar 1, 2025
83c48fd
Rearrange all stopConsumption tests in one suite
svroonland Mar 1, 2025
f1610f9
Revert "Allow returning a value from with*Stream methods"
svroonland Mar 1, 2025
be05e25
Small refactoring
svroonland Mar 1, 2025
09a6375
Merge remote-tracking branch 'origin/master' into subscription-stream…
svroonland Mar 9, 2025
adf3965
Alternative type parameters for StreamControl
svroonland Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Log more
svroonland authored and erikvanoosten committed Apr 27, 2024

Verified

This commit was signed with the committer’s verified signature.
erikvanoosten Erik van Oosten
commit 60464b6723de2301723d02942ea533b59bb179d0
14 changes: 9 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -726,11 +726,15 @@ private[consumer] final class ConsumerLive private[consumer] (
fib <- withStream(control.stream)
.onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen"))
.forkDaemon
result <- fib.join.onInterrupt(
control.stop *> fib.join.disconnect
.timeout(shutdownTimeout)
.ignore
)
result <-
fib.join.onInterrupt(
control.stop *> fib.join
.timeout(shutdownTimeout)
.tapErrorCause(cause =>
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
)
.ignore
)
} yield result
}