Skip to content

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Sep 20, 2025

Motivation:
refs: #2182

@He-Pin He-Pin added the t:stream Pekko Streams label Sep 20, 2025
@He-Pin He-Pin added this to the 2.0.0-M1 milestone Sep 20, 2025
@He-Pin He-Pin requested a review from Copilot September 20, 2025 15:54
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new count operator to the Sink API that counts all incoming elements until upstream terminates, providing a convenient way to get the total element count from a stream.

Key changes:

  • Adds Sink.count operator to both Scala and Java APIs
  • Implements the counting functionality via a new CountSink GraphStage
  • Includes comprehensive test coverage and documentation

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.

Show a summary per file
File Description
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala Adds Scala API for Sink.count with documentation
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala Adds Java API for Sink.count with documentation
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala Implements the counting logic as a GraphStage
stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala Adds default attributes for the count sink
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala Adds Scala test for the count operator
stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java Adds Java test for the count operator
docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java Adds Java documentation example
docs/src/main/paradox/stream/operators/index.md Updates operator index to include count
docs/src/main/paradox/stream/operators/Sink/count.md Adds complete documentation for the count operator

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@mdedetrich
Copy link
Contributor

@He-Pin I added an extra commit for a more performant solution to the java.lang.Long/scala.lang.Long issue. While it is true that scala.long.Long is a supertype of java.lang.Long, because CompletionStage is invariant it doesn't compile as is so you have to use asInstanceOf.

The Java SinkTest is passing locally without any issues, but ill wait till tomorrow morning (my time) for all the tests to pass so I can approve the PR.

Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@He-Pin He-Pin merged commit a1ade99 into main Sep 21, 2025
9 checks passed
@He-Pin He-Pin deleted the count branch September 21, 2025 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants