From 0776f9c4970bad41ca796d11d8378a080e60942c Mon Sep 17 00:00:00 2001 From: David Benedeki Date: Tue, 10 Sep 2024 15:24:51 +0200 Subject: [PATCH] #245 Add the ability to query REST endpoints from Reader module * created Provider to query the data from server * support for Future, IO, and ZIO based providers * work in progress --- project/Dependencies.scala | 16 +++-- .../za/co/absa/atum/reader/FlowReader.scala | 6 +- .../absa/atum/reader/PartitioningReader.scala | 6 +- .../za/co/absa/atum/reader/basic/Reader.scala | 21 +++++++ .../reader/exceptions/ReaderException.scala | 19 ++++++ .../reader/exceptions/RequestException.scala | 26 ++++++++ .../provider/AbstractHttpProvider.scala | 61 +++++++++++++++++++ .../absa/atum/reader/provider/Provider.scala | 24 ++++++++ .../reader/provider/future/HttpProvider.scala | 46 ++++++++++++++ .../reader/provider/io/HttpProvider.scala | 32 ++++++++++ .../reader/provider/zio/HttpProvider.scala | 24 ++++++++ 11 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/provider/AbstractHttpProvider.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/provider/Provider.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/provider/future/HttpProvider.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/provider/io/HttpProvider.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/provider/zio/HttpProvider.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e9783300e..df9fa90c3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -90,9 +90,9 @@ object Dependencies { private def jsonSerdeDependencies: Seq[ModuleID] = { // Circe dependencies - lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeJson - lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson - lazy val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson + val circeCore = "io.circe" %% "circe-core" % Versions.circeJson + val circeParser = "io.circe" %% "circe-parser" % Versions.circeJson + val circeGeneric = "io.circe" %% "circe-generic" % Versions.circeJson Seq( circeCore, @@ -237,8 +237,16 @@ object Dependencies { def readerDependencies(scalaVersion: Version): Seq[ModuleID] = { Seq( + "com.softwaremill.sttp.client3" %% "core" % "3.9.7", + "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % "3.9.6", + "com.softwaremill.sttp.client3" %% "armeria-backend-cats" % "3.9.8", + "com.softwaremill.sttp.client3" %% "zio" % "3.9.8", + "com.softwaremill.sttp.client3" %% "armeria-backend-zio" % "3.9.8", + "org.typelevel" %% "cats-effect" % "3.3.14", + "dev.zio" %% "zio" % "2.1.4", ) ++ - testDependencies + testDependencies ++ + jsonSerdeDependencies } def databaseDependencies: Seq[ModuleID] = { diff --git a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala index 6c45d504e..09f1b0bf2 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala @@ -16,7 +16,11 @@ package za.co.absa.atum.reader -class FlowReader { +import za.co.absa.atum.reader.basic.Reader +import za.co.absa.atum.reader.provider.Provider + +// TODO +class FlowReader[F[_]](override implicit val provider: Provider[F]) extends Reader[F]{ def foo(): String = { // just to have some testable content "bar" diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index d1153e4b5..263254934 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -16,7 +16,11 @@ package za.co.absa.atum.reader -class PartitioningReader { +import cats.Monad +import za.co.absa.atum.reader.basic.Reader +import za.co.absa.atum.reader.provider.Provider + +class PartitioningReader[F[_]: Monad](partitioning: Partitioning)(override implicit val provider: Provider[F[_]]) extends Reader[F] { def foo(): String = { // just to have some testable content "bar" diff --git a/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala b/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala new file mode 100644 index 000000000..ba4c65678 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/basic/Reader.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.basic + +import za.co.absa.atum.reader.provider.Provider + +abstract class Reader[F[_]](implicit val provider: Provider[F[_]]) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala new file mode 100644 index 000000000..d668bd39b --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/ReaderException.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.exceptions + +abstract class ReaderException(message: String) extends Exception(message) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala new file mode 100644 index 000000000..c5130cd59 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/exceptions/RequestException.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.exceptions + +import sttp.model.{RequestMetadata, StatusCode} + +case class RequestException ( + message: String, + responseBody: String, + statusCode: StatusCode, + request: RequestMetadata) + extends ReaderException(message) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/provider/AbstractHttpProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/provider/AbstractHttpProvider.scala new file mode 100644 index 000000000..7eb811a91 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/provider/AbstractHttpProvider.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.provider + +import _root_.io.circe.parser.decode +import _root_.io.circe.Decoder +import com.typesafe.config.Config +import sttp.client3.{Response, SttpBackend, UriContext, basicRequest} +import za.co.absa.atum.reader.exceptions.RequestException + +import scala.util.{Failure, Try} + +/** + * A HttpProvider is a component that is responsible for providing teh data to readers using REST API + * @tparam F + */ +abstract class AbstractHttpProvider[F[_]](val serverUrl: String) extends Provider[F] { + type RequestFunction = SttpBackend[F, Any] => F[Response[Either[String, String]]] + type ResponseMapperFunction[R] = Response[Either[String, String]] => Try[R] + + protected def executeRequest(requestFnc: RequestFunction): F[Response[Either[String, String]]] + protected def mapResponse[R](response: F[Response[Either[String, String]]], mapperFnc: ResponseMapperFunction[R]): F[Try[R]] + + protected def query[R: Decoder](endpointUri: String): F[Try[R]] = { + val endpointToQuery = serverUrl + endpointUri + val request = basicRequest + .get(uri"$endpointToQuery") + val response = executeRequest(request.send(_)) + mapResponse(response, responseMapperFunction[R]) + } + + private def responseMapperFunction[R: Decoder](response: Response[Either[String, String]]): Try[R] = { + response.body match { + case Left(error) => Failure(RequestException(response.statusText, error, response.code, response.request)) + case Right(body) => decode[R](body).toTry + } + } + +} + +object AbstractHttpProvider { + final val UrlKey = "atum.server.url" + + def atumServerUrl(config: Config): String = { + config.getString(UrlKey) + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/provider/Provider.scala b/reader/src/main/scala/za/co/absa/atum/reader/provider/Provider.scala new file mode 100644 index 000000000..c6d631787 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/provider/Provider.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.provider + +/** + * A basic class for defining methods that will be providing data to readers. + */ +abstract class Provider[F[_]] { + // here will come abstract methods that are to return data to readers +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/provider/future/HttpProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/provider/future/HttpProvider.scala new file mode 100644 index 000000000..e2cd69f61 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/provider/future/HttpProvider.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.reader.provider.future + +import cats.implicits.catsStdInstancesForFuture +import com.typesafe.config.{Config, ConfigFactory} +import sttp.client3.Response +import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend +import za.co.absa.atum.reader.provider.AbstractHttpProvider + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try + + +class HttpProvider(serverUrl: String)(implicit executor: ExecutionContext) extends AbstractHttpProvider[Future](serverUrl) { + + def this(config: Config = ConfigFactory.load())(implicit executor: ExecutionContext) = { + this(AbstractHttpProvider.atumServerUrl(config ))(executor) + } + + private val asyncHttpClientFutureBackend = AsyncHttpClientFutureBackend() + + override protected def executeRequest(requestFnc: RequestFunction): Future[Response[Either[String, String]]] = { + requestFnc(asyncHttpClientFutureBackend) + } + + override protected def mapResponse[R]( + response: Future[Response[Either[String, String]]], + mapperFnc: ResponseMapperFunction[R]): Future[Try[R]] = { + response.map(mapperFnc) + } +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/provider/io/HttpProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/provider/io/HttpProvider.scala new file mode 100644 index 000000000..d8d1ba9b9 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/provider/io/HttpProvider.scala @@ -0,0 +1,32 @@ +package za.co.absa.atum.reader.provider.io + +import cats.effect.IO +import com.typesafe.config.{Config, ConfigFactory} +import sttp.client3.Response +import sttp.client3.armeria.cats.ArmeriaCatsBackend +import za.co.absa.atum.reader.provider.AbstractHttpProvider + +import scala.util.Try + +class HttpProvider(serverUrl: String) extends AbstractHttpProvider[IO](serverUrl) { + + def this(config: Config = ConfigFactory.load()) = { + this(AbstractHttpProvider.atumServerUrl(config )) + } + + override protected def executeRequest(requestFnc: RequestFunction): IO[Response[Either[String, String]]] = { + ArmeriaCatsBackend + .resource[IO]() + .use(requestFnc) + } + + override protected def mapResponse[R]( + response: IO[Response[Either[String, String]]], + mapperFnc: ResponseMapperFunction[R]): IO[Try[R]] = { + response.map(mapperFnc) + } +} + +object HttpProvider { + lazy implicit val httpProvider: HttpProvider = new HttpProvider() +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/provider/zio/HttpProvider.scala b/reader/src/main/scala/za/co/absa/atum/reader/provider/zio/HttpProvider.scala new file mode 100644 index 000000000..a1885447d --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/provider/zio/HttpProvider.scala @@ -0,0 +1,24 @@ +package za.co.absa.atum.reader.provider.zio + +import com.typesafe.config.{Config, ConfigFactory} +import sttp.client3.Response +import sttp.client3.armeria.zio.ArmeriaZioBackend +import za.co.absa.atum.reader.provider.AbstractHttpProvider +import zio.ZIO + +import scala.util.Try + +class HttpProvider(serverUrl: String) extends AbstractHttpProvider[ZIO](serverUrl) { + + def this(config: Config = ConfigFactory.load()) = { + this(AbstractHttpProvider.atumServerUrl(config )) + } + + + + override protected def executeRequest(requestFnc: RequestFunction): ZIO[Response[Either[String, String]]] = { + ArmeriaZioBackend.usingDefaultClient().map(requestFnc) + } + + override protected def mapResponse[R](response: ZIO[Response[Either[String, String]]], mapperFnc: ResponseMapperFunction[R]): ZIO[Try[R]] = ??? +} //TODO