diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
new file mode 100644
index 00000000000..015d149e99b
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
@@ -0,0 +1,69 @@
+# mapWithResource
+
+Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
+1. `create`: Open or Create the resource.
+2. `f`: Transform each element inputs with the help of resource.
+3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.
+
+## Description
+
+Transform each stream element with the help of a resource.
+The functions are by default called on Pekko's dispatcher for blocking IO to avoid interfering with other stream operations.
+See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important.
+The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.
+
+The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once.
+- upstream completes or fails, the optional value returns by `close` will be emitted to downstream if defined.
+- downstream cancels or fails, the optional value returns by `close` will be ignored.
+- shutdowns abruptly, the optional value returns by `close` will be ignored.
+ You can do some clean-up here.
+
+Early completion can be done with combination of the @apidoc[Flow.takeWhile](Flow) operator.
+
+See also @ref:[unfoldResource](../Source/unfoldResource.md), @ref:[unfoldResourceAsync](../Source/unfoldResourceAsync.md).
+
+You can configure the default dispatcher for this Source by changing the `org.apache.pekko.stream.materializer.blocking-io-dispatcher`
+or set it for a given Source by using ActorAttributes.
+
+## Examples
+
+Imagine we have a database API which may potentially block when we perform a query,
+and the database connection can be reused for each query.
+
+Scala
+: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource-blocking-api }
+
+Java
+: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource-blocking-api }
+
+Let's see how we make use of the API above safely through `mapWithResource`:
+
+Scala
+: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource }
+
+Java
+: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource }
+
+In this example we retrieve data form two tables with the same shared connection, and transform the results
+to individual records with @scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`], once done the connection is closed.
+
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the mapping function returns an element and downstream is ready to consume it
+
+**backpressures** downstream backpressures
+
+**completes** upstream completes
+
+**cancels** downstream cancels
+
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index 7504329bcf6..5f2d73c4640 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -171,6 +171,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
+|Source/Flow|@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|Source/Flow|@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
@@ -522,6 +523,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
* [mapConcat](Source-or-Flow/mapConcat.md)
* [mapError](Source-or-Flow/mapError.md)
+* [mapWithResource](Source-or-Flow/mapWithResource.md)
* [maybe](Source/maybe.md)
* [merge](Source-or-Flow/merge.md)
* [mergeAll](Source-or-Flow/mergeAll.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java b/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java
new file mode 100644
index 00000000000..02c9ba85945
--- /dev/null
+++ b/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package jdocs.stream.operators.sourceorflow;
+
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.stream.javadsl.Source;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+public interface MapWithResource {
+ // #mapWithResource-blocking-api
+ interface DBDriver {
+ Connection create(URL url, String userName, String password);
+ }
+
+ interface Connection {
+ void close();
+ }
+
+ interface Database {
+ // blocking query
+ QueryResult doQuery(Connection connection, String query);
+ }
+
+ interface QueryResult {
+ boolean hasMore();
+
+ // potentially blocking retrieval of each element
+ DatabaseRecord next();
+
+ // potentially blocking retrieval all element
+ List toList();
+ }
+
+ interface DatabaseRecord {}
+ // #mapWithResource-blocking-api
+
+ default void mapWithResourceExample() {
+ final ActorSystem system = null;
+ final URL url = null;
+ final String userName = "Akka";
+ final String password = "Hakking";
+ final DBDriver dbDriver = null;
+ // #mapWithResource
+ // some database for JVM
+ final Database db = null;
+ Source.from(
+ Arrays.asList(
+ "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
+ "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
+ .mapWithResource(
+ () -> dbDriver.create(url, userName, password),
+ (connection, query) -> db.doQuery(connection, query).toList(),
+ connection -> {
+ connection.close();
+ return Optional.empty();
+ })
+ .mapConcat(elems -> elems)
+ .runForeach(System.out::println, system);
+ // #mapWithResource
+ }
+}
diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala
new file mode 100644
index 00000000000..bc866244cfe
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.sourceorflow
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.stream.scaladsl.Source
+
+import java.net.URL
+
+object MapWithResource {
+ implicit val actorSystem: ActorSystem = ???
+
+ // #mapWithResource-blocking-api
+ trait DBDriver {
+ def create(url: URL, userName: String, password: String): Connection
+ }
+ trait Connection {
+ def close(): Unit
+ }
+ trait Database {
+ // blocking query
+ def doQuery(connection: Connection, query: String): QueryResult = ???
+ }
+ trait QueryResult {
+ def hasMore: Boolean
+ // potentially blocking retrieval of each element
+ def next(): DataBaseRecord
+ // potentially blocking retrieval all element
+ def toList(): List[DataBaseRecord]
+ }
+ trait DataBaseRecord
+ // #mapWithResource-blocking-api
+ val url: URL = ???
+ val userName = "Akka"
+ val password = "Hakking"
+ val dbDriver: DBDriver = ???
+ def mapWithResourceExample(): Unit = {
+ // #mapWithResource
+ // some database for JVM
+ val db: Database = ???
+ Source(
+ List(
+ "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
+ "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
+ .mapWithResource(() => dbDriver.create(url, userName, password))(
+ (connection, query) => db.doQuery(connection, query).toList(),
+ conn => {
+ conn.close()
+ None
+ })
+ .mapConcat(identity)
+ .runForeach(println)
+ // #mapWithResource
+ }
+}
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index aa5f08a65d3..9704046ba1e 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -36,6 +36,7 @@
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -216,6 +217,26 @@ public void mustBeAbleToUseStatefulMap() throws Exception {
Assert.assertEquals("[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
+ @Test
+ public void mustBeAbleToUseMapWithResource() {
+ final AtomicBoolean gate = new AtomicBoolean(true);
+ Source.from(Arrays.asList("1", "2", "3"))
+ .via(
+ Flow.of(String.class)
+ .mapWithResource(
+ () -> "resource",
+ (resource, elem) -> elem,
+ (resource) -> {
+ gate.set(false);
+ return Optional.of("end");
+ }))
+ .runWith(TestSink.create(system), system)
+ .request(4)
+ .expectNext("1", "2", "3", "end")
+ .expectComplete();
+ Assert.assertFalse(gate.get());
+ }
+
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index d0be5c938ca..68345bc7398 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -46,6 +46,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -796,6 +797,24 @@ public void mustBeAbleToUseStatefulMapAsDistinctUntilChanged() throws Exception
Assert.assertEquals("12345", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
+ @Test
+ public void mustBeAbleToUseMapWithResource() {
+ final AtomicBoolean gate = new AtomicBoolean(true);
+ Source.from(Arrays.asList("1", "2", "3"))
+ .mapWithResource(
+ () -> "resource",
+ (resource, elem) -> elem,
+ (resource) -> {
+ gate.set(false);
+ return Optional.of("end");
+ })
+ .runWith(TestSink.create(system), system)
+ .request(4)
+ .expectNext("1", "2", "3", "end")
+ .expectComplete();
+ Assert.assertFalse(gate.get());
+ }
+
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
new file mode 100644
index 00000000000..32eae23712c
--- /dev/null
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import java.io.BufferedReader
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.annotation.{ nowarn, tailrec }
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.{ Await, Promise }
+import scala.concurrent.duration.DurationInt
+import scala.util.Success
+import scala.util.control.NoStackTrace
+
+import com.google.common.jimfs.{ Configuration, Jimfs }
+
+import org.apache.pekko
+import pekko.Done
+import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
+import pekko.stream.ActorAttributes.supervisionStrategy
+import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
+import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
+import pekko.stream.impl.StreamSupervisor.Children
+import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
+import pekko.stream.testkit.Utils.{ assertDispatcher, TE, UnboundedMailboxConfig }
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
+import pekko.testkit.EventFilter
+import pekko.util.ByteString
+
+class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
+
+ private val fs = Jimfs.newFileSystem("MapWithResourceSpec", Configuration.unix())
+ private val ex = new Exception("TEST") with NoStackTrace
+
+ private val manyLines = {
+ ("a" * 100 + "\n") * 10 +
+ ("b" * 100 + "\n") * 10 +
+ ("c" * 100 + "\n") * 10 +
+ ("d" * 100 + "\n") * 10 +
+ ("e" * 100 + "\n") * 10 +
+ ("f" * 100 + "\n") * 10
+ }
+ private val manyLinesArray = manyLines.split("\n")
+
+ private val manyLinesPath = {
+ val file = Files.createFile(fs.getPath("/testMapWithResource.dat"))
+ Files.write(file, manyLines.getBytes(StandardCharsets.UTF_8))
+ }
+ private def newBufferedReader() = Files.newBufferedReader(manyLinesPath, StandardCharsets.UTF_8)
+
+ private def readLines(reader: BufferedReader, maxCount: Int): List[String] = {
+ if (maxCount == 0) {
+ return List.empty
+ }
+
+ @tailrec
+ def loop(builder: ListBuffer[String], n: Int): ListBuffer[String] = {
+ if (n == 0) {
+ builder
+ } else {
+ val line = reader.readLine()
+ if (line eq null)
+ builder
+ else {
+ builder += line
+ loop(builder, n - 1)
+ }
+ }
+ }
+ loop(ListBuffer.empty, maxCount).result()
+ }
+
+ "MapWithResource" must {
+ "can read contents from a file" in {
+ val p = Source(List(1, 10, 20, 30))
+ .mapWithResource(() => newBufferedReader())((reader, count) => {
+ readLines(reader, count)
+ },
+ reader => {
+ reader.close()
+ None
+ })
+ .mapConcat(identity)
+ .runWith(Sink.asPublisher(false))
+
+ val c = TestSubscriber.manualProbe[String]()
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ val chunks = manyLinesArray.toList.iterator
+
+ sub.request(1)
+ c.expectNext() should ===(chunks.next())
+ sub.request(1)
+ c.expectNext() should ===(chunks.next())
+ c.expectNoMessage(300.millis)
+
+ while (chunks.hasNext) {
+ sub.request(1)
+ c.expectNext() should ===(chunks.next())
+ }
+ sub.request(1)
+
+ c.expectComplete()
+ }
+
+ "continue when Strategy is Resume and exception happened" in {
+ val p = Source
+ .repeat(1)
+ .take(100)
+ .mapWithResource(() => newBufferedReader())((reader, _) => {
+ val s = reader.readLine()
+ if (s != null && s.contains("b")) throw TE("") else s
+ },
+ reader => {
+ reader.close()
+ None
+ })
+ .withAttributes(supervisionStrategy(resumingDecider))
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[String]()
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 49).foreach(i => {
+ sub.request(1)
+ c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10))
+ })
+ sub.request(1)
+ c.expectComplete()
+ }
+
+ "close and open stream again when Strategy is Restart" in {
+ val p = Source
+ .repeat(1)
+ .take(100)
+ .mapWithResource(() => newBufferedReader())((reader, _) => {
+ val s = reader.readLine()
+ if (s != null && s.contains("b")) throw TE("") else s
+ },
+ reader => {
+ reader.close()
+ None
+ })
+ .withAttributes(supervisionStrategy(restartingDecider))
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[String]()
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 19).foreach(_ => {
+ sub.request(1)
+ c.expectNext() should ===(manyLinesArray(0))
+ })
+ sub.cancel()
+ }
+
+ "work with ByteString as well" in {
+ val chunkSize = 50
+ val buffer = new Array[Char](chunkSize)
+ val p = Source
+ .repeat(1)
+ .mapWithResource(() => newBufferedReader())((reader, _) => {
+ val s = reader.read(buffer)
+ if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None
+ },
+ reader => {
+ reader.close()
+ None
+ })
+ .takeWhile(_.isDefined)
+ .collect {
+ case Some(bytes) => bytes
+ }
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[ByteString]()
+
+ var remaining = manyLines
+ def nextChunk() = {
+ val (chunk, rest) = remaining.splitAt(chunkSize)
+ remaining = rest
+ chunk
+ }
+
+ p.subscribe(c)
+ val sub = c.expectSubscription()
+
+ (0 to 121).foreach(_ => {
+ sub.request(1)
+ c.expectNext().utf8String should ===(nextChunk())
+ })
+ sub.request(1)
+ c.expectComplete()
+ }
+
+ "use dedicated blocking-io-dispatcher by default" in {
+ val p = Source
+ .single(1)
+ .mapWithResource(() => newBufferedReader())((reader, _) => Option(reader.readLine()),
+ reader => {
+ reader.close()
+ None
+ })
+ .runWith(TestSink.probe)
+
+ SystemMaterializer(system).materializer
+ .asInstanceOf[PhasedFusingActorMaterializer]
+ .supervisor
+ .tell(StreamSupervisor.GetChildren, testActor)
+ val ref = expectMsgType[Children].children.find(_.path.toString contains "mapWithResource").get
+ try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
+ finally p.cancel()
+ }
+
+ "fail when create throws exception" in {
+ EventFilter[TE](occurrences = 1).intercept {
+ val p = Source
+ .single(1)
+ .mapWithResource[BufferedReader, String](() => throw TE(""))((reader, _) => reader.readLine(),
+ reader => {
+ reader.close()
+ None
+ })
+ .runWith(Sink.asPublisher(false))
+ val c = TestSubscriber.manualProbe[String]()
+ p.subscribe(c)
+
+ c.expectSubscription()
+ c.expectError(TE(""))
+ }
+ }
+
+ "fail when close throws exception" in {
+ val (pub, sub) = TestSource[Int]()
+ .mapWithResource(() => Iterator("a"))((it, _) => if (it.hasNext) Some(it.next()) else None, _ => throw TE(""))
+ .collect { case Some(line) => line }
+ .toMat(TestSink())(Keep.both)
+ .run()
+
+ pub.ensureSubscription()
+ sub.ensureSubscription()
+ sub.request(1)
+ pub.sendNext(1)
+ sub.expectNext("a")
+ pub.sendComplete()
+ sub.expectError(TE(""))
+ }
+
+ "not close the resource twice when read fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val probe = Source
+ .repeat(1)
+ .mapWithResource(() => 23)( // the best resource there is
+ (_, _) => throw TE("failing read"),
+ _ => {
+ closedCounter.incrementAndGet()
+ None
+ })
+ .runWith(TestSink.probe[Int])
+
+ probe.request(1)
+ probe.expectError(TE("failing read"))
+ closedCounter.get() should ===(1)
+ }
+
+ "not close the resource twice when read fails and then close fails" in {
+ val closedCounter = new AtomicInteger(0)
+ val probe = Source
+ .repeat(1)
+ .mapWithResource(() => 23)((_, _) => throw TE("failing read"),
+ _ => {
+ closedCounter.incrementAndGet()
+ if (closedCounter.get == 1) throw TE("boom")
+ None
+ })
+ .runWith(TestSink.probe[Int])
+
+ EventFilter[TE](occurrences = 1).intercept {
+ probe.request(1)
+ probe.expectError(TE("boom"))
+ }
+ closedCounter.get() should ===(1)
+ }
+
+ "will close the resource when upstream complete" in {
+ val closedCounter = new AtomicInteger(0)
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
+ reader => {
+ reader.close()
+ closedCounter.incrementAndGet()
+ Some(List("End"))
+ })
+ .mapConcat(identity)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.expectSubscription().request(2)
+ pub.sendNext(1)
+ sub.expectNext(manyLinesArray(0))
+ pub.sendComplete()
+ sub.expectNext("End")
+ sub.expectComplete()
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the resource when upstream fail" in {
+ val closedCounter = new AtomicInteger(0)
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
+ reader => {
+ reader.close()
+ closedCounter.incrementAndGet()
+ Some(List("End"))
+ })
+ .mapConcat(identity)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.expectSubscription().request(2)
+ pub.sendNext(1)
+ sub.expectNext(manyLinesArray(0))
+ pub.sendError(ex)
+ sub.expectNext("End")
+ sub.expectError(ex)
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the resource when downstream cancel" in {
+ val closedCounter = new AtomicInteger(0)
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
+ reader => {
+ reader.close()
+ closedCounter.incrementAndGet()
+ Some(List("End"))
+ })
+ .mapConcat(identity)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ val subscription = sub.expectSubscription()
+ subscription.request(2)
+ pub.sendNext(1)
+ sub.expectNext(manyLinesArray(0))
+ subscription.cancel()
+ pub.expectCancellation()
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the resource when downstream fail" in {
+ val closedCounter = new AtomicInteger(0)
+ val (pub, sub) = TestSource
+ .probe[Int]
+ .mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
+ reader => {
+ reader.close()
+ closedCounter.incrementAndGet()
+ Some(List("End"))
+ })
+ .mapConcat(identity)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ sub.request(2)
+ pub.sendNext(2)
+ sub.expectNext(manyLinesArray(0))
+ sub.expectNext(manyLinesArray(1))
+ sub.cancel(ex)
+ pub.expectCancellationWithCause(ex)
+ closedCounter.get shouldBe 1
+ }
+
+ "will close the resource on abrupt materializer termination" in {
+ @nowarn("msg=deprecated")
+ val mat = ActorMaterializer()
+ val promise = Promise[Done]()
+ val matVal = Source
+ .single(1)
+ .mapWithResource(() => {
+ newBufferedReader()
+ })((reader, count) => readLines(reader, count),
+ reader => {
+ reader.close()
+ promise.complete(Success(Done))
+ Some(List("End"))
+ })
+ .mapConcat(identity)
+ .runWith(Sink.never)(mat)
+ mat.shutdown()
+ matVal.failed.futureValue shouldBe an[AbruptTerminationException]
+ Await.result(promise.future, 3.seconds) shouldBe Done
+ }
+
+ }
+ override def afterTermination(): Unit = {
+ fs.close()
+ }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 13ecb94a6b4..d2cf599f3c8 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -44,6 +44,7 @@ import pekko.stream.Attributes._
val mapAsyncUnordered = name("mapAsyncUnordered")
val mapAsyncPartition = name("mapAsyncPartition")
val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered")
+ val mapWithResource = name("mapWithResource") and IODispatcher
val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index cd8e26de59e..468f287d365 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2225,7 +2225,7 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
private val out = Outlet[Out]("StatefulMap.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
- override protected def initialAttributes: Attributes = DefaultAttributes.statefulMap and SourceLocation.forLambda(f)
+ override protected def initialAttributes: Attributes = Attributes(SourceLocation.forLambda(f))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index e568a1e982c..58566f2d2ec 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -787,6 +787,47 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
+ /**
+ * Transform each stream element with the help of a resource.
+ *
+ * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
+ *
+ * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
+ * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
+ *
+ * Early completion can be done with combination of the [[takeWhile]] operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource to output element
+ * @param close function that closes the resource, optionally outputting a last element
+ * @since 1.1.0
+ */
+ def mapWithResource[R, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T],
+ close: function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] =
+ new Flow(
+ delegate.mapWithResource(() => create.create())(
+ (resource, out) => f(resource, out),
+ resource => close.apply(resource).toScala))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index e7c4aacef4c..7396f370e29 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2500,6 +2500,47 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
+ /**
+ * Transform each stream element with the help of a resource.
+ *
+ * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
+ *
+ * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
+ * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
+ *
+ * Early completion can be done with combination of the [[takeWhile]] operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource to output element
+ * @param close function that closes the resource, optionally outputting a last element
+ * @since 1.1.0
+ */
+ def mapWithResource[R, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T],
+ close: function.Function[R, Optional[T]]): javadsl.Source[T, Mat] =
+ new Source(
+ delegate.mapWithResource(() => create.create())(
+ (resource, out) => f(resource, out),
+ resource => close.apply(resource).toScala))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index eee009f8242..dab7481fcca 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -244,6 +244,47 @@ class SubFlow[In, Out, Mat](
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
+ /**
+ * Transform each stream element with the help of a resource.
+ *
+ * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
+ *
+ * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
+ * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
+ *
+ * Early completion can be done with combination of the [[takeWhile]] operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource to output element
+ * @param close function that closes the resource, optionally outputting a last element
+ * @since 1.1.0
+ */
+ def mapWithResource[R, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T],
+ close: function.Function[R, Optional[T]]): javadsl.SubFlow[In, T, Mat] =
+ new SubFlow(
+ delegate.mapWithResource(() => create.create())(
+ (resource, out) => f(resource, out),
+ resource => close.apply(resource).toScala))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index eb7e4d38a6f..eb119256960 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -235,6 +235,47 @@ class SubSource[Out, Mat](
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
+ /**
+ * Transform each stream element with the help of a resource.
+ *
+ * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
+ *
+ * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
+ * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
+ *
+ * Early completion can be done with combination of the [[takeWhile]] operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource to output element
+ * @param close function that closes the resource, optionally outputting a last element
+ * @since 1.1.0
+ */
+ def mapWithResource[R, T](
+ create: function.Creator[R],
+ f: function.Function2[R, Out, T],
+ close: function.Function[R, Optional[T]]): javadsl.SubSource[T, Mat] =
+ new SubSource(
+ delegate.mapWithResource(() => create.create())(
+ (resource, out) => f(resource, out),
+ resource => close.apply(resource).toScala))
+
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index fef56ceda6d..39c42f6f279 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1100,7 +1100,47 @@ trait FlowOps[+Out, +Mat] {
* @param onComplete a function that transforms the ongoing state into an optional output element
*/
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
- via(new StatefulMap[S, Out, T](create, f, onComplete))
+ via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap))
+
+ /**
+ * Transform each stream element with the help of a resource.
+ *
+ * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
+ * the mapping function for mapping the first element. The mapping function returns a mapped element to emit
+ * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
+ *
+ * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
+ * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
+ *
+ * Early completion can be done with combination of the [[takeWhile]] operator.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
+ * set it for a given Source by using [[ActorAttributes]].
+ *
+ * '''Emits when''' the mapping function returns an element and downstream is ready to consume it
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @tparam R the type of the resource
+ * @tparam T the type of the output elements
+ * @param create function that creates the resource
+ * @param f function that transforms the upstream element and the resource to output element
+ * @param close function that closes the resource, optionally outputting a last element
+ * @since 1.1.0
+ */
+ def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => Option[T]): Repr[T] =
+ via(
+ new StatefulMap[R, Out, T](
+ create,
+ (resource, out) => (resource, f(resource, out)),
+ resource => close(resource))
+ .withAttributes(DefaultAttributes.mapWithResource))
/**
* Transform each input element into an `Iterable` of output elements that is