-
Notifications
You must be signed in to change notification settings - Fork 621
Add evalFlatten methods for stream of effects
#2851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4073,10 +4073,7 @@ object Stream extends StreamLowPriority { | |
| } | ||
|
|
||
| def outcomeJoiner: F[Unit] = | ||
| outcomes.stream | ||
| .evalMap(identity) | ||
| .compile | ||
| .drain | ||
| outcomes.stream.evalFlatten.compile.drain | ||
| .guaranteeCase { | ||
| case Outcome.Succeeded(_) => | ||
| stop(None) >> output.close.void | ||
|
|
@@ -4117,6 +4114,25 @@ object Stream extends StreamLowPriority { | |
| parJoin(Int.MaxValue) | ||
| } | ||
|
|
||
| /** Provides syntax for a stream of `F` effects. */ | ||
| implicit class StreamFOps[F[_], O](private val self: Stream[F, F[O]]) { | ||
|
|
||
| /** Sequences the inner effects into the stream. */ | ||
| def evalFlatten: Stream[F, O] = | ||
| self.evalMap(identity) | ||
|
|
||
| /** Evaluates up to `maxConcurrent` inner effects concurrently, emitting | ||
| * the results in order. | ||
| */ | ||
| def parEvalFlatten( | ||
| maxConcurrent: Int | ||
| )(implicit F: Concurrent[F]) = self.parEvalMap(maxConcurrent)(identity) | ||
bplommer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /** Evaluates all inner effects concurrently, emitting the results in order. | ||
| */ | ||
| def parEvalFlattenUnbounded(implicit F: Concurrent[F]) = self.parEvalMapUnbounded(identity) | ||
|
||
| } | ||
|
|
||
| /** Provides syntax for pure streams. */ | ||
| implicit final class PureOps[O](private val self: Stream[Pure, O]) extends AnyVal { | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what it's worth, I've regretted using this shape of stream every time. You can run yourself out of memory really quickly if those inner
F[A]are nontrivial and you have a lot of them.I'm uncomfortable with encoding this in the library in a way that makes it easier to use, because I personally think this shape should be discouraged
Especially when the implementation here is pretty trivial.
And as a library user, I think it's a lot more clear to the code reader to see an inline
evalMap(identity)rather than yet another method they need to learn as part of the apiSo I'm a polite 👎 on the PR for those reasons
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know @Daenyth, I'd never think it could blow the memory. Would a suspend help resolve that? (at least to reduce the overhead of the non-trivial ones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not because the issue is the memory used by having large
Chunk[IO[A]]It's also possibly my codebase just was doing something really silly and that's why it cost so much memory for that structure