Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3821] Fix FluxBuffer to request 1 when buffer is not modified #3822

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

Sage-Pierce
Copy link

If a Set is used as the destination in Flux.buffer, the stream will not hang if/when there are duplicates in a given buffer

Previously, FluxBuffer was not taking the result of adding to the buffer into account. If adding to the buffer does not result in modifying it, an extra request(1) should be issued.

Fixes #3821

@Sage-Pierce Sage-Pierce requested a review from a team as a code owner June 1, 2024 16:04
@chemicL
Copy link
Member

chemicL commented Jun 17, 2024

Hey, @Sage-Pierce !

I apologize it took some time to respond. I reworked the bufferTimeout operator with fair backpressure recently, that's the reason for it.

Considering all existing buffer* implementations I believe their behaviour should be consistent across the offering. I wonder how this requirement would influence other implementations as it feels a bit risky in the face of concurrency. Can you share your thoughts? After looking at the codebase it seems the authors didn't consider this scenario and assumed there's no filtering happening and once an item is added, the collection's size increases. I wonder whether including this check and handling is the way to go or perhaps limiting the possible Collection types is better while combining with another operator to remove the duplicates would be a safer bet.

Let me know, thanks!

@Sage-Pierce
Copy link
Author

Hi @chemicL 👋

No worries about the delay 😄 I didn't immediately look into the other buffer* permutations, as I precisely wanted to see what the feedback on this change was before undertaking that. I would agree that consistency across all the permutations is desirable. The weird bit is that all the other permutations use some form of sizing and/or either some asynchronous trigger or a predicate. It's not clear to me that this would actually be an issue in all those other permutations, and after some initial poking around, it doesn't seem to be.

it feels a bit risky in the face of concurrency

I don't think that there's any more risk with this change regarding concurrency, since onNext is serialized, and the interaction with the buffer is already synchronized where necessary.

I wonder whether including this check and handling is the way to go or perhaps limiting the possible Collection types is better while combining with another operator to remove the duplicates would be a safer bet.

I don't think that there is currently another way to implement "give me n distinct items at a time", though I initially thought there could be with something like window, but I couldn't/can't figure it out

@chemicL
Copy link
Member

chemicL commented Jun 18, 2024

I don't think that there is currently another way to implement "give me n distinct items at a time", though I initially thought there could be with something like window, but I couldn't/can't figure it out

I came up with this:

public static void main(String[] args) {
	Sinks.Many<Integer> makeACut = Sinks.many().unicast().onBackpressureBuffer();
	Flux.just(1, 1, 2, 3, 4, 4, 4, 4, 5, 6, 6, 7)
			.window(makeACut.asFlux())
			.concatMap(w -> w.reduceWith(HashSet::new, (set, i) -> {
				set.add(i);
				if (set.size() == 2) {
					makeACut.tryEmitNext(0);
				}
				return set;
			}))
			.filter(set -> !set.isEmpty())
			.doOnNext(System.out::println)
			.blockLast();
}

Just as a conversation starter :) I do imagine this doesn't look as nice and the performance would be incomparable.

I'll try to digest the rest of the comments and review the other implementations next. For now, can you also prepare a few sample {input, output} sets so that we know what the end goal is? I mean a sequence 1, 2, 1, 3 would yield [1, 2] and [1, 3] for n == 2, but would yield [1, 2, 3] for n == 3. Is that desired? Can you share some real world scenarios that come to mind that this would benefit? I tend to first try to understand the need and then try to work towards a solution that matches the expectations. This potential mismatch regarding expected supplied aggregator types is puzzling and it would be neat if we could comprehensively address this.

@Sage-Pierce
Copy link
Author

Ah nice, that was abstractly what I had in my head, but I couldn't come up with that Sink feedback loop that you used 😄

can you also prepare a few sample {input, output} sets so that we know what the end goal is? I mean a sequence 1, 2, 1, 3 would yield [1, 2] and [1, 3] for n == 2, but would yield [1, 2, 3] for n == 3. Is that desired?

The test I wrote for this changeset covers the basic expectation I think, and it looks like you already understand my intent quite well. I'll just format those and a few more below:

Given flux.buffer(2, LinkedHashSet::new):

  • [1, 2] -> [1, 2]
  • [1, 1, 2] -> [1, 2]
  • [1, 2, 1] -> [1, 2], [1]
  • [1, 2, 1, 3] -> [1, 2], [1, 3]
  • [1, 1, 2, 3] -> [1, 2], [3]
  • [2, 1, 1, 3] -> [2, 1], [1, 3]

Can you share some real world scenarios that come to mind that this would benefit?

In my use case, I am iterating over time-bucketed data elements (from a database) and executing an I/O-bound process on them (a service call). That service call is maximally efficient when passed n distinct elements. The "buckets" are large, usually much larger than n, so the optimization I'm driving toward is iterating over n distinct elements at a time from each "bucket" and executing the I/O bound process for every n elements.

@chemicL
Copy link
Member

chemicL commented Jun 19, 2024

Thanks.

For bufferTimeout:

Replace

.<Set<Integer>>buffer(2, HashSet::new)

with

.<Set<Integer>>bufferTimeout(2, Duration.ofDays(30), HashSet::new)

and you can observe the same outcome.

Out of the others, bufferWhen also works with a Supplier of Collection, but there's no specific size so this wouldn't be an issue for that variant.

@Sage-Pierce
Copy link
Author

Sage-Pierce commented Jun 19, 2024

For the bufferTimeout behavior, that issue is slightly different than the issue being fixed here. In bufferTimeout, the amount of buffered items is tracked independently of the buffer size. This leads to FluxBufferTimeout not having the same issue of hanging as FluxBuffer. However, it does make the size-triggered emission slightly different:

Given flux.bufferTimeout(2, Duration.ofSeconds(1), LinkedHashSet::new), the current behavior is (differences marked with *):

  • [1, 2] -> [1, 2]
  • *[1, 1, 2] -> [1], [2]
  • [1, 2, 1] -> [1, 2], [1]
  • [1, 2, 1, 3] -> [1, 2], [1, 3]
  • *[1, 1, 2, 3] -> [1], [2, 3]
  • [2, 1, 1, 3] -> [2, 1], [1, 3]

Due to that significant difference in how FluxBufferTimeout keeps track of the number of buffered elements, I'm not convinced this particular behavior can be made consistent with FluxBuffer without significant refactoring of either operator.

Given that there isn't actually a "hanging" issue with bufferTimeout and the usage of a Set buffer supplier, and given that FluxBuffer's upstream request count is decoupled from the number of elements actually contained in emitted buffers, I'd be inclined to treat this behavioral difference as a separate issue, and one which may not be a practical issue at all, since bufferTimeout users inherently have to be prepared to handle buffers with sizes less than the max buffer size.

Thoughts?

@chemicL
Copy link
Member

chemicL commented Jun 21, 2024

Thanks for following up. I agree that bufferTimeout can be treated differently, thanks for sharing that view. Let's consider what would be a consistent approach to this that wouldn't surprise users of the API once we make changes around supporting Set. From the Javadoc perspective of bufferTimeout:

(...) buffers that will be emitted by the returned {@link Flux} each time the buffer reaches a maximum size OR the maxTime {@link Duration} elapses (...)

In my view, the current behaviour when presented with a Collection that can return false upon add() doesn't deliver what the Javadoc's promises. For one, this PR sounds justifiable from that perspective and thank you for starting the conversation :)

I think in order to merge something we'd need to cover all buffer* cases that accept a Supplier<C> where C is a Collection for the upstream type.

As this currently doesn't work correctly nor consistently we should make an effort to bring more clarity in the docs and tests.

For bufferTimeout:

  1. I'd suggest adding a test which validates the Set against an upstream with N consecutive duplicate values, where N is higher than the prefetch size. If there is no issue here, let's proceed with next steps, otherwise we should seek on explaining the failures (just like some operators state that an accepted Publisher needs to be finite -> we are sometimes not at liberty to limit implementations by type but only by the specification).
  2. Please document in the Javadoc that for Collections that are capable of returning false from add() the buffers can be smaller than the expected buffer size.

For buffer:

  1. Please also address the other Subscriber implementations. You touched BufferExactSubscriber, but we also have BufferSkipSubscriber and BufferOverlappingSubscriber.
  2. We need more test cases.

I understand this requires more work so please let me know if you're still keen to contribute. I'd just like us to have a consistent UX across similar operators and that requires a holistic approach. I'll be away for a week but if you make any progress, please do commit and I'll review the changes when I'm back.

Thanks again @Sage-Pierce and I look forward to where this discussion leads us :)

@Sage-Pierce
Copy link
Author

@chemicL I don't mind taking a stab at all of that 😄 May take some time, but may have an updated review next week.

spierce added 2 commits June 24, 2024 07:55
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
@Sage-Pierce
Copy link
Author

@chemicL I believe I have addressed your feedback, and I look forward to your re-review when you return. I will be on vacation for the first half of July, so it may take me a bit to follow up on further feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FluxBuffer hangs when buffer Supplier returns a Set
2 participants