Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add mapWithResource stream operator. #931

Merged
merged 1 commit into from
Jan 16, 2024

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Jan 13, 2024

Motivation:
Makes every stream element can be handled with resource.

Like Flux's using operator.

refs:
#786

and origin Akka issue
refs:akka/akka#31355
refs:akka/akka#22730

@He-Pin He-Pin requested a review from jxnu-liguobin January 13, 2024 07:45
@He-Pin He-Pin added this to the 1.1.0 milestone Jan 13, 2024
@He-Pin He-Pin force-pushed the mapWithResource branch 3 times, most recently from 90c5969 to 26580ba Compare January 13, 2024 19:01
@He-Pin He-Pin requested a review from pjfanning January 13, 2024 19:03
@@ -2225,7 +2225,7 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
private val out = Outlet[Out]("StatefulMap.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)

override protected def initialAttributes: Attributes = DefaultAttributes.statefulMap and SourceLocation.forLambda(f)
override protected def initialAttributes: Attributes = Attributes(SourceLocation.forLambda(f))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to get the tests in MapWithResourceSpec work, which need the name to be mapWithResource

* @since 1.1.0
*/
def mapWithResource[R, T](
create: java.util.function.Supplier[R],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked Flux, where a Callable is used, which can throw exception.
But I think, we did not expect any exception to be throw ,so a Supplier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should assume create call can never fail. Can you use Callable instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a user should handle the maybe exception explicitly, with some kind of try/retry otherwise they will wondering , wow, why my stream just failed,ouch, I need somekind of retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be using one of the interfaces in https://github.com/apache/incubator-pekko/blob/b0fdac259bd57fdd481483f3fe9a7aec6e1ff38a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala which are deliberately designed to be used within Pekko because they support throwing exceptions in the apply?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eg: fs2's Stream#bracket where def bracket[F[_], R](acquire: F[R])(release: R => F[Unit]): Stream[F, R] the F is not limited to Try too. so does the Resource.make.

And if a use change the checked exception to a runtime exception, the flow will fails too, eg: using @SneakyThrows .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ,under another package

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a technical reason not to have overloaded methods in the same class? Java and Scala have good polymorphism support. There are limitations, of course - but can you name them if there is a technical reason?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to my local copy of javadsl.Flow.

  def mapWithResource[R, T](
      create: java.util.function.Supplier[R],
      f: java.util.function.BiFunction[R, Out, T],
      close: java.util.function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] = ???

  def mapWithResource[R, T](
      create: function.Creator[R],
      f: java.util.function.BiFunction[R, Out, T],
      close: java.util.function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] = ???

This compiled ok. Maybe there are other issues when implementations are added but so far, it looks feasible to use overloaded methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not work with lambda

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overloading like this will not work. I also realised that close needs to use a org.apache.pekko.stream.javadsl.functional.Procedure just likes its done here in other parts of the code

("d" * 100 + "\n") * 10 +
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
Copy link
Contributor

@laglangyue laglangyue Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be ('a' to 'f') .map(_.toString) .map(_*100).map(_+"/n") ....etc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was copied from another test iirc, it's test code , and very explicitly, anyway, thanks

* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => Option[T]): Repr[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For close function
Using for scala3
scala.util.control.Exception.ultimately for scala2
Do we need to consider such usage case for this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like elaborate?

Copy link
Contributor

@laglangyue laglangyue Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking scala3 as an example, when I close or use a stream, I will write this code

Using (Source. from (File...)) {
  // Todo something
}

I won't implement a close function. as also to Java8 try (resource) {}
I hope Pekko can do the same. Can we use Using feature and remove the params close:R=>Option[T]
I haven't seen the implementation principle of Using in scala3 yet, so I'm not sure how to define the function mapWithResource with Using, maybe further optimization in another PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see , that would be nice addition.

  1. directly support for Java's AutoCloseable.
  2. directly support for Scala's Using.

But I think those two doesn't related to this PR, because the nice feature of this operator is "optional emit with the resource/final signal".

But and what your suggestion can come up with another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laglangyue Would you create an issue about your idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and new functions should conform to the pre-existing style.

This is the critical thing, anything added needs to follow the existing style. I am not against changing the existing style, but a proper discussion needs to be made rather than it being tacked onto an existing PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait a little longer, I will change to creator/callable api this week, but I have to say, Java developers want java.util.*.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggre with pjfanning. This PR should be more conservative. Annother PR will discuss the new Java APIs or method-overload to more confortable for java developer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I have to say, Java developers want java.util.*.

That may be the case (although I have no idea what the technical reason why is) but its outside the scope of this PR. Feel free to make a github discussion on it though!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started #960 - please feel free to adjust the description in that discussion.

@He-Pin He-Pin added t:stream Pekko Streams release notes Need to release note labels Jan 14, 2024
Copy link
Contributor

@pjfanning pjfanning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR really needs some javadsl tests

Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As has been discussed #931 (comment)

  • The create parameter needs to be changed to a org.apache.pekko.stream.javadsl.functional.Creator since its possible that a supplied lambda can throw an exception (i.e. IOException when creating a file, see here for an existing example)
  • The close parameter needs to be changed to a org.apache.pekko.stream.javadsl.functional.Procedure since its possible that a supplied lambda can throw an exception (i.e. think of calling close() on a file handle which can throw an IOException), see here for an existing example.

Please lets not get into yet another tangent about whether to use java.util.* @FunctionalInterface functions (even as overloaded parameters). New functionality has to follow the existing style/design of Pekko unless there is a strong exceptional reason why and in this case there isn't. If you want to discuss this point, feel free to do so on github discussion/mailing list.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 16, 2024

I've made the changes. Let's leave it at that for now. We can discuss in a separate post how future versions can provide a more Java-friendly API.

Copy link
Contributor

@mdedetrich mdedetrich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@pjfanning pjfanning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@nvollmar nvollmar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@He-Pin He-Pin merged commit bd8ee25 into apache:main Jan 16, 2024
18 checks passed
@pjfanning pjfanning removed the release notes Need to release note label Aug 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants