From 9d69ae1fc602b2e6cccdce8f1165b3ced96bac8d Mon Sep 17 00:00:00 2001 From: spierce Date: Sun, 23 Jun 2024 21:49:40 -0500 Subject: [PATCH] [#3821] Add test to `bufferTimeout` to verify `Set` bufferSupplier --- .../core/publisher/FluxBufferTest.java | 32 +++++++++---------- .../core/publisher/FluxBufferTimeoutTest.java | 11 +++++++ 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java index 53dea65657..818ba207a4 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTest.java @@ -692,8 +692,8 @@ public void bufferExactSupplierUsesSet(String input, String output, @Nullable St .verifyThenAssertThat(Duration.ofSeconds(2)); if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { + assertions.hasNotDiscardedElements(); + } else { assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); } } @@ -716,16 +716,16 @@ public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) .>buffer(2, 3, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) + .as(it -> StepVerifier.create(it, outputs.size())) .expectNextSequence(outputs) - .thenCancel() + .thenCancel() .verifyThenAssertThat(Duration.ofSeconds(2)); - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } } @ParameterizedTestWithName @@ -738,7 +738,7 @@ public void bufferSkipWithMax2Skip3SupplierUsesSet(String input, String output, "1|2|1|3|3|4, 1|2|3|4;3|4, 1|3", "1|1|2|3, 1|2|3;3, 1", "2|1|1|3, 2|1|3;3, 1", - "1|2|1|2|3, 1|2|3;3, 1|2" + "1|2|1|2|3, 1|2|3;3, 1|2" }) public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String output, @Nullable String discard) { List> outputs = Arrays.stream(output.split(";")) @@ -747,15 +747,15 @@ public void bufferOverlapWithMax4Skip2SupplierUsesSet(String input, String outpu StepVerifier.Assertions assertions = Flux.just(input.split("\\|")) .>buffer(4, 2, HashSet::new) - .as(it -> StepVerifier.create(it, outputs.size())) + .as(it -> StepVerifier.create(it, outputs.size())) .expectNextSequence(outputs) .expectComplete() .verifyThenAssertThat(Duration.ofSeconds(2)); - if (discard == null) { - assertions.hasNotDiscardedElements(); - } else { - assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); - } + if (discard == null) { + assertions.hasNotDiscardedElements(); + } else { + assertions.hasDiscardedExactly((Object[]) discard.split("\\|")); + } } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java index e750a5df96..3cc9006e6c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -421,4 +422,14 @@ public void discardOnError() { .verifyThenAssertThat() .hasDiscardedExactly(1, 2, 3); } + + @Test + public void bufferSupplierUsesSet() { + Flux.just(1, 1, 1, 1, 1, 1, 1) + .>bufferTimeout(3, Duration.ofSeconds(2), HashSet::new) + .as(it -> StepVerifier.create(it, 3)) + .expectNext(Collections.singleton(1), Collections.singleton(1), Collections.singleton(1)) + .expectComplete() + .verify(Duration.ofSeconds(2)); + } }