Skip to content

Commit

Permalink
#245 Add the ability to query REST endpoints from Reader module
Browse files Browse the repository at this point in the history
* created Provider to query the data from server
* support for Future, IO, and ZIO based providers
* work in progress
  • Loading branch information
benedeki committed Oct 31, 2024
1 parent d773a93 commit 0776f9c
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 6 deletions.
16 changes: 12 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ object Dependencies {
private def jsonSerdeDependencies: Seq[ModuleID] = {

// Circe dependencies
lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeJson
lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson
lazy val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson
val circeCore = "io.circe" %% "circe-core" % Versions.circeJson
val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson
val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson

Seq(
circeCore,
Expand Down Expand Up @@ -237,8 +237,16 @@ object Dependencies {

def readerDependencies(scalaVersion: Version): Seq[ModuleID] = {
Seq(
"com.softwaremill.sttp.client3" %% "core" % "3.9.7",
"com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.9.6",
"com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "3.9.8",
"com.softwaremill.sttp.client3" %% "zio" % "3.9.8",
"com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "3.9.8",
"org.typelevel" %% "cats-effect" % "3.3.14",
"dev.zio" %% "zio" % "2.1.4",
) ++
testDependencies
testDependencies ++
jsonSerdeDependencies
}

def databaseDependencies: Seq[ModuleID] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package za.co.absa.atum.reader

class FlowReader {
import za.co.absa.atum.reader.basic.Reader
import za.co.absa.atum.reader.provider.Provider

// TODO
class FlowReader[F[_]](override implicit val provider: Provider[F]) extends Reader[F]{
def foo(): String = {
// just to have some testable content
"bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package za.co.absa.atum.reader

class PartitioningReader {
import cats.Monad
import za.co.absa.atum.reader.basic.Reader
import za.co.absa.atum.reader.provider.Provider

class PartitioningReader[F[_]: Monad](partitioning: Partitioning)(override implicit val provider: Provider[F[_]]) extends Reader[F] {
def foo(): String = {
// just to have some testable content
"bar"
Expand Down
21 changes: 21 additions & 0 deletions reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.basic

import za.co.absa.atum.reader.provider.Provider

abstract class Reader[F[_]](implicit val provider: Provider[F[_]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

abstract class ReaderException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

import sttp.model.{RequestMetadata, StatusCode}

case class RequestException (
message: String,
responseBody: String,
statusCode: StatusCode,
request: RequestMetadata)
extends ReaderException(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.provider

import _root_.io.circe.parser.decode
import _root_.io.circe.Decoder
import com.typesafe.config.Config
import sttp.client3.{Response, SttpBackend, UriContext, basicRequest}
import za.co.absa.atum.reader.exceptions.RequestException

import scala.util.{Failure, Try}

/**
* A HttpProvider is a component that is responsible for providing teh data to readers using REST API
* @tparam F
*/
abstract class AbstractHttpProvider[F[_]](val serverUrl: String) extends Provider[F] {
type RequestFunction = SttpBackend[F, Any] => F[Response[Either[String, String]]]
type ResponseMapperFunction[R] = Response[Either[String, String]] => Try[R]

protected def executeRequest(requestFnc: RequestFunction): F[Response[Either[String, String]]]
protected def mapResponse[R](response: F[Response[Either[String, String]]], mapperFnc: ResponseMapperFunction[R]): F[Try[R]]

protected def query[R: Decoder](endpointUri: String): F[Try[R]] = {
val endpointToQuery = serverUrl + endpointUri
val request = basicRequest
.get(uri"$endpointToQuery")
val response = executeRequest(request.send(_))
mapResponse(response, responseMapperFunction[R])
}

private def responseMapperFunction[R: Decoder](response: Response[Either[String, String]]): Try[R] = {
response.body match {
case Left(error) => Failure(RequestException(response.statusText, error, response.code, response.request))
case Right(body) => decode[R](body).toTry
}
}

}

object AbstractHttpProvider {
final val UrlKey = "atum.server.url"

def atumServerUrl(config: Config): String = {
config.getString(UrlKey)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.provider

/**
* A basic class for defining methods that will be providing data to readers.
*/
abstract class Provider[F[_]] {
// here will come abstract methods that are to return data to readers
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.provider.future

import cats.implicits.catsStdInstancesForFuture
import com.typesafe.config.{Config, ConfigFactory}
import sttp.client3.Response
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
import za.co.absa.atum.reader.provider.AbstractHttpProvider

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try


class HttpProvider(serverUrl: String)(implicit executor: ExecutionContext) extends AbstractHttpProvider[Future](serverUrl) {

def this(config: Config = ConfigFactory.load())(implicit executor: ExecutionContext) = {
this(AbstractHttpProvider.atumServerUrl(config ))(executor)
}

private val asyncHttpClientFutureBackend = AsyncHttpClientFutureBackend()

override protected def executeRequest(requestFnc: RequestFunction): Future[Response[Either[String, String]]] = {
requestFnc(asyncHttpClientFutureBackend)
}

override protected def mapResponse[R](
response: Future[Response[Either[String, String]]],
mapperFnc: ResponseMapperFunction[R]): Future[Try[R]] = {
response.map(mapperFnc)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package za.co.absa.atum.reader.provider.io

import cats.effect.IO
import com.typesafe.config.{Config, ConfigFactory}
import sttp.client3.Response
import sttp.client3.armeria.cats.ArmeriaCatsBackend
import za.co.absa.atum.reader.provider.AbstractHttpProvider

import scala.util.Try

class HttpProvider(serverUrl: String) extends AbstractHttpProvider[IO](serverUrl) {

def this(config: Config = ConfigFactory.load()) = {
this(AbstractHttpProvider.atumServerUrl(config ))
}

override protected def executeRequest(requestFnc: RequestFunction): IO[Response[Either[String, String]]] = {
ArmeriaCatsBackend
.resource[IO]()
.use(requestFnc)
}

override protected def mapResponse[R](
response: IO[Response[Either[String, String]]],
mapperFnc: ResponseMapperFunction[R]): IO[Try[R]] = {
response.map(mapperFnc)
}
}

object HttpProvider {
lazy implicit val httpProvider: HttpProvider = new HttpProvider()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package za.co.absa.atum.reader.provider.zio

import com.typesafe.config.{Config, ConfigFactory}
import sttp.client3.Response
import sttp.client3.armeria.zio.ArmeriaZioBackend
import za.co.absa.atum.reader.provider.AbstractHttpProvider
import zio.ZIO

import scala.util.Try

class HttpProvider(serverUrl: String) extends AbstractHttpProvider[ZIO](serverUrl) {

def this(config: Config = ConfigFactory.load()) = {
this(AbstractHttpProvider.atumServerUrl(config ))
}



override protected def executeRequest(requestFnc: RequestFunction): ZIO[Response[Either[String, String]]] = {
ArmeriaZioBackend.usingDefaultClient().map(requestFnc)
}

override protected def mapResponse[R](response: ZIO[Response[Either[String, String]]], mapperFnc: ResponseMapperFunction[R]): ZIO[Try[R]] = ???
} //TODO

0 comments on commit 0776f9c

Please sign in to comment.