Skip to content

Commit

Permalink
[#3821] Update behavior of FluxBufferSkip and FluxBufferOverlap
Browse files Browse the repository at this point in the history
- Make `Collection` behavior consistent among all FluxBuffer* operators
- Added several more tests for all FluxBuffer* operators covering usage of `Set`
  • Loading branch information
spierce committed Jun 24, 2024
1 parent d601552 commit b464df3
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 27 deletions.
35 changes: 21 additions & 14 deletions reactor-core/src/main/java/reactor/core/publisher/FluxBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,11 @@ public void onNext(T t) {
}

if (b != null) {
b.add(t);
if (b.size() == size) {
if (!b.add(t)) {
Operators.onDiscard(t, this.ctx);
s.request(1);
return;
} else if (b.size() == size) {
buffer = null;
actual.onNext(b);
}
Expand Down Expand Up @@ -478,6 +481,19 @@ public void onNext(T t) {
return;
}

C b0 = peek();

for (C b : this) {
// It should never be the case that an element can be added to the first open
// buffer and not all of them. Otherwise, the buffer behavior is non-deterministic,
// and this operator's behavior is undefined.
if (!b.add(t) && b == b0) {
Operators.onDiscard(t, actual.currentContext());
s.request(1);
return;
}
}

long i = index;

if (i % skip == 0L) {
Expand All @@ -494,25 +510,16 @@ public void onNext(T t) {
return;
}

b.add(t);
offer(b);
}

C b = peek();

if (b != null && b.size() + 1 == size) {
if (b0 != null && b0.size() == size) {
poll();

b.add(t);

actual.onNext(b);

actual.onNext(b0);
produced++;
}

for (C b0 : this) {
b0.add(t);
}

index = i + 1;
}

Expand Down
107 changes: 94 additions & 13 deletions reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.CsvSource;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -413,18 +416,6 @@ public void supplierThrows() {
.assertNotComplete();
}

@Test
public void supplierUsesSet() {
Flux.just(1, 1, 2)
.<Set<Integer>>buffer(2, HashSet::new)
.take(1, true)
.as(StepVerifier::create)
.expectNext(Stream.of(1, 2).collect(Collectors.toSet()))
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2))
.hasDiscardedExactly(1);
}

@Test
public void bufferWillSubdivideAnInputFlux() {
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8);
Expand Down Expand Up @@ -677,4 +668,94 @@ public void discardOnErrorOverlap() {
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 3); //we already opened a 2nd buffer
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2;1, ",
"1|2|1|3, 1|2;1|3, ",
"1|1|2|3, 1|2;3, 1",
"2|1|1|3, 2|1;1|3, "
})
public void bufferExactSupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2, ",
"1|2|1|3, 1|2;3, 1",
"1|2|1|1|3, 1|2;1|3, 1",
"1|1|2|3, 1|2, 1",
"2|1|1|3, 2|1;3, 1"
})
public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(2, 3, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.thenCancel()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}

@ParameterizedTestWithName
@CsvSource({
"1|2, 1|2, ",
"1|1|1, 1, 1|1",
"1|1|2, 1|2, 1",
"1|2|1, 1|2, 1",
"1|2|1|3, 1|2|3;3, 1",
"1|2|1|3|3|4, 1|2|3|4;3|4, 1|3",
"1|1|2|3, 1|2|3;3, 1",
"2|1|1|3, 2|1|3;3, 1",
"1|2|1|2|3, 1|2|3;3, 1|2"
})
public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) {
List<Set<Object>> outputs = Arrays.stream(output.split(";"))
.map(it -> Arrays.<Object>stream(it.split("\\|")).collect(Collectors.toSet()))
.collect(Collectors.toList());

StepVerifier.Assertions assertions = Flux.just(input.split("\\|"))
.<Collection<Object>>buffer(4, 2, HashSet::new)
.as(it -> StepVerifier.create(it, outputs.size()))
.expectNextSequence(outputs)
.expectComplete()
.verifyThenAssertThat(Duration.ofSeconds(2));

if (discard == null) {
assertions.hasNotDiscardedElements();
} else {
assertions.hasDiscardedExactly((Object[]) discard.split("\\|"));
}
}
}

0 comments on commit b464df3

Please sign in to comment.