-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add mapWithResource stream operator. #931
Conversation
715615c
to
a5e84a0
Compare
a5e84a0
to
c0ba681
Compare
docs/src/main/java/docs/ddata/protobuf/msg/TwoPhaseSetMessages.java
Outdated
Show resolved
Hide resolved
90c5969
to
26580ba
Compare
@@ -2225,7 +2225,7 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) = | |||
private val out = Outlet[Out]("StatefulMap.out") | |||
override val shape: FlowShape[In, Out] = FlowShape(in, out) | |||
|
|||
override protected def initialAttributes: Attributes = DefaultAttributes.statefulMap and SourceLocation.forLambda(f) | |||
override protected def initialAttributes: Attributes = Attributes(SourceLocation.forLambda(f)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to get the tests in MapWithResourceSpec work, which need the name to be mapWithResource
* @since 1.1.0 | ||
*/ | ||
def mapWithResource[R, T]( | ||
create: java.util.function.Supplier[R], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked Flux, where a Callable
is used, which can throw exception.
But I think, we did not expect any exception to be throw ,so a Supplier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should assume create
call can never fail. Can you use Callable
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a user should handle the maybe exception explicitly, with some kind of try/retry
otherwise they will wondering , wow, why my stream just failed,ouch, I need somekind of retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be using one of the interfaces in https://github.com/apache/incubator-pekko/blob/b0fdac259bd57fdd481483f3fe9a7aec6e1ff38a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala which are deliberately designed to be used within Pekko because they support throwing exceptions in the apply
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eg: fs2's Stream#bracket
where def bracket[F[_], R](acquire: F[R])(release: R => F[Unit]): Stream[F, R]
the F
is not limited to Try
too. so does the Resource.make
.
And if a use change the checked exception to a runtime exception, the flow will fails too, eg: using @SneakyThrows
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ,under another package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a technical reason not to have overloaded methods in the same class? Java and Scala have good polymorphism support. There are limitations, of course - but can you name them if there is a technical reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to my local copy of javadsl.Flow.
def mapWithResource[R, T](
create: java.util.function.Supplier[R],
f: java.util.function.BiFunction[R, Out, T],
close: java.util.function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] = ???
def mapWithResource[R, T](
create: function.Creator[R],
f: java.util.function.BiFunction[R, Out, T],
close: java.util.function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] = ???
This compiled ok. Maybe there are other issues when implementations are added but so far, it looks feasible to use overloaded methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not work with lambda
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overloading like this will not work. I also realised that close
needs to use a org.apache.pekko.stream.javadsl.functional.Procedure
just likes its done here in other parts of the code
("d" * 100 + "\n") * 10 + | ||
("e" * 100 + "\n") * 10 + | ||
("f" * 100 + "\n") * 10 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be ('a' to 'f') .map(_.toString) .map(_*100).map(_+"/n") ....etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was copied from another test iirc, it's test code , and very explicitly, anyway, thanks
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
Show resolved
Hide resolved
* @param close function that closes the resource, optionally outputting a last element | ||
* @since 1.1.0 | ||
*/ | ||
def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => Option[T]): Repr[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For close function
Using for scala3
scala.util.control.Exception.ultimately for scala2
Do we need to consider such usage case for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking scala3 as an example, when I close or use a stream, I will write this code
Using (Source. from (File...)) {
// Todo something
}
I won't implement a close function. as also to Java8 try (resource) {}
I hope Pekko can do the same. Can we use Using feature
and remove the params close:R=>Option[T]
I haven't seen the implementation principle of Using in scala3
yet, so I'm not sure how to define the function mapWithResource
with Using
, maybe further optimization in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see , that would be nice addition.
- directly support for Java's AutoCloseable.
- directly support for Scala's Using.
But I think those two doesn't related to this PR, because the nice feature of this operator is "optional emit with the resource/final signal".
But and what your suggestion can come up with another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@laglangyue Would you create an issue about your idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and new functions should conform to the pre-existing style.
This is the critical thing, anything added needs to follow the existing style. I am not against changing the existing style, but a proper discussion needs to be made rather than it being tacked onto an existing PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait a little longer, I will change to creator/callable
api this week, but I have to say, Java developers want java.util.*
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aggre with pjfanning. This PR should be more conservative. Annother PR will discuss the new Java APIs or method-overload to more confortable for java developer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I have to say, Java developers want
java.util.*
.
That may be the case (although I have no idea what the technical reason why is) but its outside the scope of this PR. Feel free to make a github discussion on it though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started #960 - please feel free to adjust the description in that discussion.
26580ba
to
7021b33
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR really needs some javadsl tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As has been discussed #931 (comment)
- The
create
parameter needs to be changed to aorg.apache.pekko.stream.javadsl.functional.Creator
since its possible that a supplied lambda can throw an exception (i.e.IOException
when creating a file, see here for an existing example) - The
close
parameter needs to be changed to aorg.apache.pekko.stream.javadsl.functional.Procedure
since its possible that a supplied lambda can throw an exception (i.e. think of callingclose()
on a file handle which can throw anIOException
), see here for an existing example.
Please lets not get into yet another tangent about whether to use java.util.*
@FunctionalInterface
functions (even as overloaded parameters). New functionality has to follow the existing style/design of Pekko unless there is a strong exceptional reason why and in this case there isn't. If you want to discuss this point, feel free to do so on github discussion/mailing list.
7021b33
to
b4ff365
Compare
I've made the changes. Let's leave it at that for now. We can discuss in a separate post how future versions can provide a more Java-friendly API. |
b4ff365
to
dc64cd9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
dc64cd9
to
4069585
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Motivation:
Makes every stream element can be handled with resource.
Like Flux's
using
operator.refs:
#786
and origin Akka issue
refs:akka/akka#31355
refs:akka/akka#22730