Skip to content

Commit 5244bfd

Browse files
committed
#245 Add the ability to query REST endpoints from Reader module
* created Provider to query the data from server * support for Future, IO, and ZIO based providers * work in progress
1 parent 15b9a63 commit 5244bfd

File tree

11 files changed

+275
-6
lines changed

11 files changed

+275
-6
lines changed

project/Dependencies.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ object Dependencies {
9090
private def jsonSerdeDependencies: Seq[ModuleID] = {
9191

9292
// Circe dependencies
93-
lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeJson
94-
lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson
95-
lazy val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson
93+
val circeCore = "io.circe" %% "circe-core" % Versions.circeJson
94+
val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson
95+
val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson
9696

9797
Seq(
9898
circeCore,
@@ -231,8 +231,16 @@ object Dependencies {
231231

232232
def readerDependencies(scalaVersion: Version): Seq[ModuleID] = {
233233
Seq(
234+
"com.softwaremill.sttp.client3" %% "core" % "3.9.7",
235+
"com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.9.6",
236+
"com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "3.9.8",
237+
"com.softwaremill.sttp.client3" %% "zio" % "3.9.8",
238+
"com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "3.9.8",
239+
"org.typelevel" %% "cats-effect" % "3.3.14",
240+
"dev.zio" %% "zio" % "2.1.4",
234241
) ++
235-
testDependencies
242+
testDependencies ++
243+
jsonSerdeDependencies
236244
}
237245

238246
def databaseDependencies: Seq[ModuleID] = {

reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package za.co.absa.atum.reader
1818

19-
class FlowReader {
19+
import za.co.absa.atum.reader.basic.Reader
20+
import za.co.absa.atum.reader.provider.Provider
21+
22+
// TODO
23+
class FlowReader[F[_]](override implicit val provider: Provider[F]) extends Reader[F]{
2024
def foo(): String = {
2125
// just to have some testable content
2226
"bar"

reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package za.co.absa.atum.reader
1818

19-
class PartitioningReader {
19+
import cats.Monad
20+
import za.co.absa.atum.reader.basic.Reader
21+
import za.co.absa.atum.reader.provider.Provider
22+
23+
class PartitioningReader[F[_]: Monad](partitioning: Partitioning)(override implicit val provider: Provider[F[_]]) extends Reader[F] {
2024
def foo(): String = {
2125
// just to have some testable content
2226
"bar"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.basic
18+
19+
import za.co.absa.atum.reader.provider.Provider
20+
21+
abstract class Reader[F[_]](implicit val provider: Provider[F[_]])
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.exceptions
18+
19+
abstract class ReaderException(message: String) extends Exception(message)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.exceptions
18+
19+
import sttp.model.{RequestMetadata, StatusCode}
20+
21+
case class RequestException (
22+
message: String,
23+
responseBody: String,
24+
statusCode: StatusCode,
25+
request: RequestMetadata)
26+
extends ReaderException(message)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.provider
18+
19+
import _root_.io.circe.parser.decode
20+
import _root_.io.circe.Decoder
21+
import com.typesafe.config.Config
22+
import sttp.client3.{Response, SttpBackend, UriContext, basicRequest}
23+
import za.co.absa.atum.reader.exceptions.RequestException
24+
25+
import scala.util.{Failure, Try}
26+
27+
/**
28+
* A HttpProvider is a component that is responsible for providing teh data to readers using REST API
29+
* @tparam F
30+
*/
31+
abstract class AbstractHttpProvider[F[_]](val serverUrl: String) extends Provider[F] {
32+
type RequestFunction = SttpBackend[F, Any] => F[Response[Either[String, String]]]
33+
type ResponseMapperFunction[R] = Response[Either[String, String]] => Try[R]
34+
35+
protected def executeRequest(requestFnc: RequestFunction): F[Response[Either[String, String]]]
36+
protected def mapResponse[R](response: F[Response[Either[String, String]]], mapperFnc: ResponseMapperFunction[R]): F[Try[R]]
37+
38+
protected def query[R: Decoder](endpointUri: String): F[Try[R]] = {
39+
val endpointToQuery = serverUrl + endpointUri
40+
val request = basicRequest
41+
.get(uri"$endpointToQuery")
42+
val response = executeRequest(request.send(_))
43+
mapResponse(response, responseMapperFunction[R])
44+
}
45+
46+
private def responseMapperFunction[R: Decoder](response: Response[Either[String, String]]): Try[R] = {
47+
response.body match {
48+
case Left(error) => Failure(RequestException(response.statusText, error, response.code, response.request))
49+
case Right(body) => decode[R](body).toTry
50+
}
51+
}
52+
53+
}
54+
55+
object AbstractHttpProvider {
56+
final val UrlKey = "atum.server.url"
57+
58+
def atumServerUrl(config: Config): String = {
59+
config.getString(UrlKey)
60+
}
61+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.provider
18+
19+
/**
20+
* A basic class for defining methods that will be providing data to readers.
21+
*/
22+
abstract class Provider[F[_]] {
23+
// here will come abstract methods that are to return data to readers
24+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.atum.reader.provider.future
18+
19+
import cats.implicits.catsStdInstancesForFuture
20+
import com.typesafe.config.{Config, ConfigFactory}
21+
import sttp.client3.Response
22+
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
23+
import za.co.absa.atum.reader.provider.AbstractHttpProvider
24+
25+
import scala.concurrent.{ExecutionContext, Future}
26+
import scala.util.Try
27+
28+
29+
class HttpProvider(serverUrl: String)(implicit executor: ExecutionContext) extends AbstractHttpProvider[Future](serverUrl) {
30+
31+
def this(config: Config = ConfigFactory.load())(implicit executor: ExecutionContext) = {
32+
this(AbstractHttpProvider.atumServerUrl(config ))(executor)
33+
}
34+
35+
private val asyncHttpClientFutureBackend = AsyncHttpClientFutureBackend()
36+
37+
override protected def executeRequest(requestFnc: RequestFunction): Future[Response[Either[String, String]]] = {
38+
requestFnc(asyncHttpClientFutureBackend)
39+
}
40+
41+
override protected def mapResponse[R](
42+
response: Future[Response[Either[String, String]]],
43+
mapperFnc: ResponseMapperFunction[R]): Future[Try[R]] = {
44+
response.map(mapperFnc)
45+
}
46+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package za.co.absa.atum.reader.provider.io
2+
3+
import cats.effect.IO
4+
import com.typesafe.config.{Config, ConfigFactory}
5+
import sttp.client3.Response
6+
import sttp.client3.armeria.cats.ArmeriaCatsBackend
7+
import za.co.absa.atum.reader.provider.AbstractHttpProvider
8+
9+
import scala.util.Try
10+
11+
class HttpProvider(serverUrl: String) extends AbstractHttpProvider[IO](serverUrl) {
12+
13+
def this(config: Config = ConfigFactory.load()) = {
14+
this(AbstractHttpProvider.atumServerUrl(config ))
15+
}
16+
17+
override protected def executeRequest(requestFnc: RequestFunction): IO[Response[Either[String, String]]] = {
18+
ArmeriaCatsBackend
19+
.resource[IO]()
20+
.use(requestFnc)
21+
}
22+
23+
override protected def mapResponse[R](
24+
response: IO[Response[Either[String, String]]],
25+
mapperFnc: ResponseMapperFunction[R]): IO[Try[R]] = {
26+
response.map(mapperFnc)
27+
}
28+
}
29+
30+
object HttpProvider {
31+
lazy implicit val httpProvider: HttpProvider = new HttpProvider()
32+
}

0 commit comments

Comments
 (0)