Skip to content

Commit

Permalink
feat: Add mapWithResource stream operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 15, 2024
1 parent cf70478 commit 7021b33
Show file tree
Hide file tree
Showing 12 changed files with 845 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# mapWithResource

Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(java.util.function.Supplier,java.util.function.BiFunction,java.util.function.Function)" }
1. `create`: Open or Create the resource.
2. `f`: Transform each element inputs with the help of resource.
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.

## Description

Transform each stream element with the help of a resource.
The functions are by default called on Pekko's dispatcher for blocking IO to avoid interfering with other stream operations.
See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important.
The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.

The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once.
- upstream completes or fails, the optional value returns by `close` will be emitted to downstream if defined.
- downstream cancels or fails, the optional value returns by `close` will be ignored.
- shutdowns abruptly, the optional value returns by `close` will be ignored.
You can do some clean-up here.

Early completion can be done with combination of the @apidoc[Flow.takeWhile](Flow) operator.

See also @ref:[unfoldResource](../Source/unfoldResource.md), @ref:[unfoldResourceAsync](../Source/unfoldResourceAsync.md).

You can configure the default dispatcher for this Source by changing the `org.apache.pekko.stream.materializer.blocking-io-dispatcher`
or set it for a given Source by using ActorAttributes.

## Examples

Imagine we have a database API which may potentially block when we perform a query,
and the database connection can be reused for each query.

Scala
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource-blocking-api }

Java
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource-blocking-api }

Let's see how we make use of the API above safely through `mapWithResource`:

Scala
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource }

Java
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource }

In this example we retrieve data form two tables with the same shared connection, and transform the results
to individual records with @scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`], once done the connection is closed.


## Reactive Streams semantics

@@@div { .callout }

**emits** the mapping function returns an element and downstream is ready to consume it

**backpressures** downstream backpressures

**completes** upstream completes

**cancels** downstream cancels

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
Expand Down Expand Up @@ -522,6 +523,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
* [mapConcat](Source-or-Flow/mapConcat.md)
* [mapError](Source-or-Flow/mapError.md)
* [mapWithResource](Source-or-Flow/mapWithResource.md)
* [maybe](Source/maybe.md)
* [merge](Source-or-Flow/merge.md)
* [mergeAll](Source-or-Flow/mergeAll.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.sourceorflow;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Source;

import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public interface MapWithResource {
// #mapWithResource-blocking-api
interface DBDriver {
Connection create(URL url, String userName, String password);
}

interface Connection {
void close();
}

interface Database {
// blocking query
QueryResult doQuery(Connection connection, String query);
}

interface QueryResult {
boolean hasMore();

// potentially blocking retrieval of each element
DatabaseRecord next();

// potentially blocking retrieval all element
List<DatabaseRecord> toList();
}

interface DatabaseRecord {}
// #mapWithResource-blocking-api

default void mapWithResourceExample() {
final ActorSystem system = null;
final URL url = null;
final String userName = "Akka";
final String password = "Hakking";
final DBDriver dbDriver = null;
// #mapWithResource
// some database for JVM
final Database db = null;
Source.from(
Arrays.asList(
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
.mapWithResource(
() -> dbDriver.create(url, userName, password),
(connection, query) -> db.doQuery(connection, query).toList(),
connection -> {
connection.close();
return Optional.empty();
})
.mapConcat(elems -> elems)
.runForeach(System.out::println, system);
// #mapWithResource
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package docs.stream.operators.sourceorflow

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Source

import java.net.URL

object MapWithResource {
implicit val actorSystem: ActorSystem = ???

// #mapWithResource-blocking-api
trait DBDriver {
def create(url: URL, userName: String, password: String): Connection
}
trait Connection {
def close(): Unit
}
trait Database {
// blocking query
def doQuery(connection: Connection, query: String): QueryResult = ???
}
trait QueryResult {
def hasMore: Boolean
// potentially blocking retrieval of each element
def next(): DataBaseRecord
// potentially blocking retrieval all element
def toList(): List[DataBaseRecord]
}
trait DataBaseRecord
// #mapWithResource-blocking-api
val url: URL = ???
val userName = "Akka"
val password = "Hakking"
val dbDriver: DBDriver = ???
def mapWithResourceExample(): Unit = {
// #mapWithResource
// some database for JVM
val db: Database = ???
Source(
List(
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
.mapWithResource(() => dbDriver.create(url, userName, password))(
(connection, query) => db.doQuery(connection, query).toList(),
conn => {
conn.close()
None
})
.mapConcat(identity)
.runForeach(println)
// #mapWithResource
}
}
Loading

0 comments on commit 7021b33

Please sign in to comment.