-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optionalVia and unsafeOptionalDataVia #1422
Add optionalVia and unsafeOptionalDataVia #1422
Conversation
2c1d6a1
to
74d4c3b
Compare
Just for my understanding: why would you want to keep the
I think the CI "doc check" should tell you what to do, let's see if that's indeed the case :) |
OK so the error is slightly obscure, but that's where to put it. We might want to show a more friendly message in this case. |
b383f34
to
cc2775b
Compare
Yes, this is especially true of
Thanks, working on this now! |
Right, that is indeed a common use case. To nitpick: losing the cursor should never be a functional problem, but always be an optimization, right? Still, a worthwhile one. We might want to take some time to consider whether adding a
The advantage of those approaches is that you avoid the I don't have a strong opinion and am not a priori opposed to adding I wonder if the regular |
cc2775b
to
e4e5143
Compare
This isn't true in the case of Kafka if you perform side effects, if you don't submit a cursor for an element then it will continuously retry that element (while it is true that you can expect this with Kafka due to its architecture since exactly once isn't a thing, we don't want to unnecessarily perform side effects more than we have to)
We are using an actual Flow i.e.
This could work but not only is it more complicated, it can unnecessarily delay the side effects that my usecase relies on because as you note it creates gaps and if those gaps are long enough it can cause problems. Futhermore if the gap is longer than the commit time interval it creates the problem described before where the kafka topics will get resent (because cursor offset is not submitted) therefore creating multiple side effects.
I agree with this sentiment, but as you noted I added it in for consistency |
a260bbe
to
a78643f
Compare
I have just repushed with the following changes
|
a78643f
to
1da14f7
Compare
Sure, but that would strictly speaking still be an optimization, not a correctness issue, right? It is always possible that the side effect happens but the offset commit doesn't, so your application needs to be robust against that. I don't mean to diminish the fact that this optimization is useful, though ;) .
I'm not entirely sure
I don't see how it would delay side effects, but indeed it would delay the offset commits. I didn't know that would cause resends during normal operation, that seems like a reasonable motivation to have this 'unsafe' pattern. |
So to clarify technically in terms of correctness there indeed isn't an issue, what I was meaning to say is that creating additional side effects (because we don't commit a cursor in the specific time slice causing Kafka to replay topics) is something I want to avoid, especially since the service we are making a request to is rate limited one (futhermore in my case, the subet of elements that will make requests is a very small subset further exacerbating the issue) And yes you are right in that it won't delay side effects, was in a rush when writing the message and didn't think clearly |
f1a3867
to
83d4171
Compare
38b49b7
to
cba69b6
Compare
So the PR is now complete, I have double checked the generated docs and went through the PR a couple of times to make sure that everything looks okay. |
* applied. | ||
* @since 1.1.0 | ||
*/ | ||
def optionalVia[FIn, FOut, FViaOut, FMat, FViaMat, Mat](flow: Flow[FIn, Option[FOut], FMat], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we can just change it to something like predicate: Out =>Boolen
for conditional transformation, instead of been forcing to lift to an Optional/Option
, which can introduce additional allocation.
I have not checked other stream implementations for this, but I would like to avoid of the Option/Optional but use a predicate instead, and it's some kind of groupBy
/`partition operator,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
BTW, do we need to confirm that the return type of Flow Out is Optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optional is necessary for the problem I am solving since
- I need to keep the elements that don't fulfill the predicate (i.e. I am using
StreamWithContext
/FlowWithContext
and I don't want to lose the context). Adding this toStream
/Flow
was largely for consistency reasons but it can still be useful for other scenarious. - I need to know which elements were filtered and which weren't after the matter of fact, so even if this was changed to a predicate it wouldn't ultimately save on performance as I would manually have to add the functionality to achieve this.
I am not against adding an alternative version that avoids boxing due to using a predicate but I would suggest to do that in a different PR, and as @raboof pointed out earlier that may not even need a specific utility function depending on whether point 2 is important or not.
It goes without saying that I have no issue with performance improvements that keep the same api/behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optional is just a kind of marker field, you can have an object with a field additionalProcessEnabled:Boolean
for that case.
I suggest we look at fs2/zio/reactor-core for the same case before we merge this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optional is just a kind of marker field, you can have an object with a field
additionalProcessEnabled:Boolean
for that case.
But in that case the returned types would then be different which means it would be a different API/overloaded function etc etc (which I don't have a problem with, we can just do it separately then?).
I suggest we look at fs2/zio/reactor-core for the same case before we merge this.
Sure, not that familiar with these API's but I can have a look at them a bit later
stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala
Outdated
Show resolved
Hide resolved
702973b
to
517a8aa
Compare
docs/src/main/paradox/stream/operators/Source-or-Flow/optionalVia.md
Outdated
Show resolved
Hide resolved
517a8aa
to
cefb907
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I don't like this kind of optional operation, considering that the return type must be optional, I can also accept it.
@He-Pin @jxnu-liguobin If you aren't completely happy with the API, I can always add As the annotation suggests, its specifically designed for functions where we aren't 100% sure if its the right design. |
I will find a time to look into it and other implementation, sorry for the delay, a little busy at work |
s ~> broadcast.in | ||
|
||
broadcast.out(0) ~> filterAvailable ~> viaF ~> mapIntoOption ~> merge.in(0) | ||
broadcast.out(1) ~> filterUnavailable ~> merge.in(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the elements' order of the new flow, will it keep the origin order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's some kind of the
Source.flatmapConcat { element =>
if predicate(element)
Source.single(element).via(yourFlow)
else
Source.single(element)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the elements' order of the new flow, will it keep the origin order?
Yes, this is verified in tests
I think it's some kind of the
Source.flatmapConcat { element => if predicate(element) Source.single(element).via(yourFlow) else Source.single(element) }
This only works if the via
leaves the output type unchanged. If the output type changes then the true
branch will have a different element compared to the false
branch.
Thats why the wrapping in an Option
/Optional
type is needed because in the case where you don't apply the via
its just None
/Optional.empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@pjfanning Thanks for approval. @He-Pin if there are issues with this approach we always have option to change it afterwards before main release |
This PR adds
optionalVia
/unsafeOptionalVia
functions which allows you to apply aviaFlow
to elements in the stream only if they are defined. This kind of utility function has cropped up due to needing to fulfill a pretty basic/common scenario, i.e. in my case I have aSource
of elements and aFlow
that performs a http request but I only want to apply the http request a subset of those elements.Tests have been added to the PR but I haven't added documentation, @raboof @jrudolph @He-Pin maybe you can let me know where to put it.