From 9e38362ed38832caa21340ba755a13b20eb1be53 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Tue, 16 Jan 2024 13:26:51 +0800 Subject: [PATCH] feat: Add expectNextN to StreamTestKit. (#962) --- .../pekko/stream/testkit/StreamTestKit.scala | 15 +++++++++++++-- .../pekko/stream/testkit/StreamTestKitSpec.scala | 12 +++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala index 520c11cf69a..fe10aee6246 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala @@ -23,9 +23,8 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import org.apache.pekko -import pekko.actor.ClassicActorSystemProvider -import org.reactivestreams.{ Publisher, Subscriber, Subscription } import pekko.actor.{ ActorRef, ActorSystem, DeadLetterSuppression, NoSerializationVerificationNeeded } +import pekko.actor.ClassicActorSystemProvider import pekko.stream._ import pekko.stream.impl._ import pekko.testkit.{ TestActor, TestProbe } @@ -33,6 +32,8 @@ import pekko.testkit.TestActor.AutoPilot import pekko.util.JavaDurationConverters import pekko.util.ccompat._ +import org.reactivestreams.{ Publisher, Subscriber, Subscription } + /** * Provides factory methods for various Publishers. */ @@ -462,6 +463,16 @@ object TestSubscriber { self } + /** + * Fluent DSL + * Expect the given elements to be signalled in order. + * @since 1.1.0 + */ + def expectNextN(elems: java.util.List[I]): Self = { + elems.forEach(e => probe.expectMsg(OnNext(e))) + self + } + /** * Fluent DSL * Expect the given elements to be signalled in any order. diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala index edeb7fc225f..37ea49c0715 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala @@ -14,16 +14,15 @@ package org.apache.pekko.stream.testkit import scala.concurrent.duration._ - import org.apache.pekko import pekko.stream.scaladsl.Source import pekko.stream.testkit.scaladsl.TestSink - import pekko.testkit._ - import pekko.testkit.TestEvent.Mute import pekko.testkit.TestEvent.UnMute +import java.util + class StreamTestKitSpec extends PekkoSpec { val ex = new Exception("Boom!") @@ -199,5 +198,12 @@ class StreamTestKitSpec extends PekkoSpec { "#expectNextN given specific elements" in { Source(1 to 4).runWith(TestSink.probe).request(4).expectNextN(4) should ===(List(1, 2, 3, 4)) } + + "#expectNextN given specific elements for java list" in { + Source(1 to 4).runWith(TestSink[Int]()) + .request(4) + .expectNextN(util.Arrays.asList(1, 2, 3, 4)) + .expectComplete() + } } }