Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FluxBuffer hangs when buffer Supplier returns a Set #3821

Open
Sage-Pierce opened this issue Jun 1, 2024 · 0 comments · May be fixed by #3822
Open

FluxBuffer hangs when buffer Supplier returns a Set #3821

Sage-Pierce opened this issue Jun 1, 2024 · 0 comments · May be fixed by #3822

Comments

@Sage-Pierce
Copy link

Sage-Pierce commented Jun 1, 2024

If Flux.buffer(int maxSize, Supplier<Collection> bufferSupplier) is invoked where the bufferSupplier returns a Set, the stream may hang on duplicate upstream emissions and bounded downstream request.

Extra Context

I have a use case where I'd like to process batches of n distinct elements from a source at a time, and using Flux.buffer(n, LinkedHashSet::new) would be a convenient, concise way to do so.

Expected Behavior

The stream should not hang and should complete successfully

Actual Behavior

The stream hangs and never completes

Steps to Reproduce

This test will fail

	@Test
	public void supplierUsesSet() {
		Flux.just(1, 1, 2)
			.<Set<Integer>>buffer(2, HashSet::new)
			.take(1, true)
			.as(StepVerifier::create)
			.expectNext(Stream.of(1, 2).collect(Collectors.toSet()))
			.expectComplete()
			.verify(Duration.ofSeconds(2));
	}

If the test is changed to not contain duplicates, i.e. Flux.just(1, 2), the test passes.

Possible Solution

FluxBuffer ignores whether adding to the buffer actually has any effect on the buffer. If adding to the buffer doesn't modify it, an extra s.request(1) should be issued.

Your Environment

  • Reactor version(s) used: 3.7.0-SNAPSHOT
  • Other relevant libraries versions (eg. netty, ...): N/A
  • JVM version (java -version): 17.0.8
  • OS and version (eg uname -a): MacOS Darwin 23.5.0
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 1, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 19, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 22, 2024
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 22, 2024
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
Sage-Pierce pushed a commit to Sage-Pierce/reactor-core that referenced this issue Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant