-
Notifications
You must be signed in to change notification settings - Fork 156
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add mapWithResource stream operator.
- Loading branch information
Showing
14 changed files
with
885 additions
and
3 deletions.
There are no files selected for viewing
69 changes: 69 additions & 0 deletions
69
docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
# mapWithResource | ||
|
||
Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed. | ||
|
||
@ref[Simple operators](../index.md#simple-operators) | ||
|
||
## Signature | ||
|
||
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" } | ||
1. `create`: Open or Create the resource. | ||
2. `f`: Transform each element inputs with the help of resource. | ||
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element. | ||
|
||
## Description | ||
|
||
Transform each stream element with the help of a resource. | ||
The functions are by default called on Pekko's dispatcher for blocking IO to avoid interfering with other stream operations. | ||
See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important. | ||
The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification. | ||
|
||
The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once. | ||
- upstream completes or fails, the optional value returns by `close` will be emitted to downstream if defined. | ||
- downstream cancels or fails, the optional value returns by `close` will be ignored. | ||
- shutdowns abruptly, the optional value returns by `close` will be ignored. | ||
You can do some clean-up here. | ||
|
||
Early completion can be done with combination of the @apidoc[Flow.takeWhile](Flow) operator. | ||
|
||
See also @ref:[unfoldResource](../Source/unfoldResource.md), @ref:[unfoldResourceAsync](../Source/unfoldResourceAsync.md). | ||
|
||
You can configure the default dispatcher for this Source by changing the `org.apache.pekko.stream.materializer.blocking-io-dispatcher` | ||
or set it for a given Source by using ActorAttributes. | ||
|
||
## Examples | ||
|
||
Imagine we have a database API which may potentially block when we perform a query, | ||
and the database connection can be reused for each query. | ||
|
||
Scala | ||
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource-blocking-api } | ||
|
||
Java | ||
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource-blocking-api } | ||
|
||
Let's see how we make use of the API above safely through `mapWithResource`: | ||
|
||
Scala | ||
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource } | ||
|
||
Java | ||
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource } | ||
|
||
In this example we retrieve data form two tables with the same shared connection, and transform the results | ||
to individual records with @scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`], once done the connection is closed. | ||
|
||
|
||
## Reactive Streams semantics | ||
|
||
@@@div { .callout } | ||
|
||
**emits** the mapping function returns an element and downstream is ready to consume it | ||
|
||
**backpressures** downstream backpressures | ||
|
||
**completes** upstream completes | ||
|
||
**cancels** downstream cancels | ||
|
||
@@@ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package jdocs.stream.operators.sourceorflow; | ||
|
||
import org.apache.pekko.actor.ActorSystem; | ||
import org.apache.pekko.stream.javadsl.Source; | ||
|
||
import java.net.URL; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
public interface MapWithResource { | ||
// #mapWithResource-blocking-api | ||
interface DBDriver { | ||
Connection create(URL url, String userName, String password); | ||
} | ||
|
||
interface Connection { | ||
void close(); | ||
} | ||
|
||
interface Database { | ||
// blocking query | ||
QueryResult doQuery(Connection connection, String query); | ||
} | ||
|
||
interface QueryResult { | ||
boolean hasMore(); | ||
|
||
// potentially blocking retrieval of each element | ||
DatabaseRecord next(); | ||
|
||
// potentially blocking retrieval all element | ||
List<DatabaseRecord> toList(); | ||
} | ||
|
||
interface DatabaseRecord {} | ||
// #mapWithResource-blocking-api | ||
|
||
default void mapWithResourceExample() { | ||
final ActorSystem system = null; | ||
final URL url = null; | ||
final String userName = "Akka"; | ||
final String password = "Hakking"; | ||
final DBDriver dbDriver = null; | ||
// #mapWithResource | ||
// some database for JVM | ||
final Database db = null; | ||
Source.from( | ||
Arrays.asList( | ||
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", | ||
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) | ||
.mapWithResource( | ||
() -> dbDriver.create(url, userName, password), | ||
(connection, query) -> db.doQuery(connection, query).toList(), | ||
connection -> { | ||
connection.close(); | ||
return Optional.empty(); | ||
}) | ||
.mapConcat(elems -> elems) | ||
.runForeach(System.out::println, system); | ||
// #mapWithResource | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package docs.stream.operators.sourceorflow | ||
|
||
import org.apache.pekko.actor.ActorSystem | ||
import org.apache.pekko.stream.scaladsl.Source | ||
|
||
import java.net.URL | ||
|
||
object MapWithResource { | ||
implicit val actorSystem: ActorSystem = ??? | ||
|
||
// #mapWithResource-blocking-api | ||
trait DBDriver { | ||
def create(url: URL, userName: String, password: String): Connection | ||
} | ||
trait Connection { | ||
def close(): Unit | ||
} | ||
trait Database { | ||
// blocking query | ||
def doQuery(connection: Connection, query: String): QueryResult = ??? | ||
} | ||
trait QueryResult { | ||
def hasMore: Boolean | ||
// potentially blocking retrieval of each element | ||
def next(): DataBaseRecord | ||
// potentially blocking retrieval all element | ||
def toList(): List[DataBaseRecord] | ||
} | ||
trait DataBaseRecord | ||
// #mapWithResource-blocking-api | ||
val url: URL = ??? | ||
val userName = "Akka" | ||
val password = "Hakking" | ||
val dbDriver: DBDriver = ??? | ||
def mapWithResourceExample(): Unit = { | ||
// #mapWithResource | ||
// some database for JVM | ||
val db: Database = ??? | ||
Source( | ||
List( | ||
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", | ||
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) | ||
.mapWithResource(() => dbDriver.create(url, userName, password))( | ||
(connection, query) => db.doQuery(connection, query).toList(), | ||
conn => { | ||
conn.close() | ||
None | ||
}) | ||
.mapConcat(identity) | ||
.runForeach(println) | ||
// #mapWithResource | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.