Skip to content

Commit

Permalink
get partitioning measures: 229 (#249)
Browse files Browse the repository at this point in the history
* Defining and implementing PL/PG function

* Defining measure dto

* Removing measure ID

* defining get class to call db-function

* Implement db call class

* removing unused import

* adding integration tests

* restoring db-function

* Adding test and fixing the build

* Adding tests layer and updating sql function

* Implementing service functionality

* Implementing Controller functionality

* fixing return type

* fixing merge conflicts

* Adding the endpoint

* Adding repository unit test

* Adding service unit test

* Fixing merge conflicts

* Fixing fileName conflicts

* removing unused file

* Changing file name

* fixing file format

* Removing outdated tests

* Fixing the function and the test cases

* Restoring files

* implementing endpoint test-cases

* Fixing endpoint tests

* Addressing GitHub comments

* Update database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala

Co-authored-by: David Benedeki <[email protected]>

* Update database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMeasuresByIdV2IntegrationTests.scala

Co-authored-by: David Benedeki <[email protected]>

---------

Co-authored-by: David Benedeki <[email protected]>
  • Loading branch information
TebaleloS and benedeki authored Sep 4, 2024
1 parent c27c776 commit d31eded
Show file tree
Hide file tree
Showing 16 changed files with 450 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Function: runs.get_partitioning_measures_by_id(Long)
CREATE OR REPLACE FUNCTION runs.get_partitioning_measures_by_id(
IN i_partitioning_id BIGINT,
OUT status INTEGER,
OUT status_text TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[]
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_measures_by_id(1)
-- Returns measures for the given partitioning id
--
-- Parameters:
-- i_partitioning_id - partitioning id we are asking the measures for
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- measure_name - Name of the measure
-- measured_columns - Array of columns associated with the measure
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN

PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;

IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

RETURN QUERY
SELECT 11, 'OK', MD.measure_name, MD.measured_columns
FROM runs.measure_definitions AS MD
WHERE MD.fk_partitioning = i_partitioning_id;

RETURN;

END;
$$
LANGUAGE plpgsql volatile SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_measures_by_id(BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_measures_by_id(BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import za.co.absa.balta.classes.setter.CustomDBType

class GetPartitioningMeasuresByIdV2IntegrationTests extends DBTestSuite {
private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id"

private val partitioning: JsonBString = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

test("Get partitioning measures by id should return partitioning measures for partitioning with measures") {

table("runs.partitionings").insert(
add("partitioning", partitioning)
.add("created_by", "Thomas")
)

val fkPartitioning: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning, "id_partitioning").get.get

table("runs.measure_definitions").insert(
add("fk_partitioning", fkPartitioning)
.add("created_by", "Thomas")
.add("measure_name", "measure1")
.add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]"))
)

table("runs.measure_definitions").insert(
add("fk_partitioning", fkPartitioning)
.add("created_by", "Thomas")
.add("measure_name", "measure2")
.add("measured_columns", CustomDBType("""{"col2"}""", "TEXT[]"))
)

function(fncGetPartitioningMeasuresById)
.setParam("i_partitioning_id", fkPartitioning)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("OK"))
assert(results.getString("measure_name").contains("measure1"))
assert(results.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1")))

val results2 = queryResult.next()
assert(results2.getInt("status").contains(11))
assert(results2.getString("status_text").contains("OK"))
assert(results2.getString("measure_name").contains("measure2"))
assert(results2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2")))
}
}

test("Get partitioning measures by id should return error for partitioning without measures") {

table("runs.partitionings").insert(
add("partitioning", partitioning)
.add("created_by", "Thomas")
)

val fkPartitioning: Long = table("runs.partitionings")
.fieldValue("partitioning", partitioning, "id_partitioning").get.get

function(fncGetPartitioningMeasuresById)
.setParam(fkPartitioning)
.execute { queryResult =>
assert(!queryResult.hasNext)
}
}

test("Get partitioning measures by id should return an error for non-existing partitioning") {

function(fncGetPartitioningMeasuresById)
.setParam(999)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(41))
assert(results.getString("status_text").contains("Partitioning not found"))
assert(!queryResult.hasNext) // checking no more records are returned.
}
}

}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object Main extends ZIOAppDefault with Server {
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningMeasuresById.layer,
GetPartitioningAdditionalData.layer,
GetPartitioningAdditionalDataV2.layer,
CreateOrUpdateAdditionalData.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ trait PartitioningController {

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]


def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}

override def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[MeasureDTO], Seq[MeasureDTO]](
partitioningService.getPartitioningMeasuresById(partitioningId)
)
)
}

}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio._
import za.co.absa.atum.server.model.MeasureFromDB

import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get

class GetPartitioningMeasuresById(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[Long, MeasureFromDB, Task](values =>
Seq(fr"${values}")
) with StandardStatusHandling with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns")
}

object GetPartitioningMeasuresById {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningMeasuresById] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningMeasuresById()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ trait Endpoints extends BaseEndpoints {
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val getPartitioningMeasuresEndpointV2
: PublicEndpoint[Long, ErrorResponse, MultiSuccessResponse[MeasureDTO], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Measures)
.out(statusCode(StatusCode.Ok))
.out(jsonBody[MultiSuccessResponse[MeasureDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = {
endpoint.get.in(ZioMetrics).out(stringBody)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.swagger.bundle.SwaggerInterpreter
import sttp.tapir.ztapir._
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO}
import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion}
import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController}
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
Expand Down Expand Up @@ -85,6 +85,7 @@ trait Routes extends Endpoints with ServerOptions {
createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2),
createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2),
createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2),
createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2),
createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit)
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes
Expand All @@ -102,7 +103,8 @@ trait Routes extends Endpoints with ServerOptions {
patchPartitioningAdditionalDataEndpointV2,
getPartitioningCheckpointsEndpointV2,
getPartitioningCheckpointEndpointV2,
getFlowCheckpointsEndpointV2
getFlowCheckpointsEndpointV2,
getPartitioningMeasuresEndpointV2
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))
.from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ trait PartitioningRepository {

def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO]


def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class PartitioningRepositoryImpl(
createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData,
getPartitioningCheckpointsFn: GetPartitioningCheckpoints,
getPartitioningByIdFn: GetPartitioningById,
getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2
getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2,
getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById
) extends PartitioningRepository
with BaseRepository {

Expand Down Expand Up @@ -98,6 +99,14 @@ class PartitioningRepositoryImpl(
}
}


override def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] = {
dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresByIdFn(partitioningId), "getPartitioningMeasures")
.map(_.map { case MeasureFromDB(measureName, measuredColumns) =>
MeasureDTO(measureName.get, measuredColumns.get)
})
}

}

object PartitioningRepositoryImpl {
Expand All @@ -108,8 +117,8 @@ object PartitioningRepositoryImpl {
with CreateOrUpdateAdditionalData
with GetPartitioningCheckpoints
with GetPartitioningAdditionalDataV2
with GetPartitioningCheckpoints
with GetPartitioningById,
with GetPartitioningById
with GetPartitioningMeasuresById,
PartitioningRepository
] = ZLayer {
for {
Expand All @@ -120,14 +129,16 @@ object PartitioningRepositoryImpl {
getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints]
getPartitioningById <- ZIO.service[GetPartitioningById]
getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2]
getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById]
} yield new PartitioningRepositoryImpl(
createPartitioningIfNotExists,
getPartitioningMeasures,
getPartitioningAdditionalData,
createOrUpdateAdditionalData,
getPartitioningCheckpoints,
getPartitioningById,
getPartitioningAdditionalDataV2
getPartitioningAdditionalDataV2,
getPartitioningMeasuresV2
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ trait PartitioningService {

def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO]

def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
override def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] = {
repositoryCall(partitioningRepository.getPartitioning(partitioningId), "getPartitioning")
}

override def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] = {
repositoryCall(
partitioningRepository.getPartitioningMeasuresById(partitioningId),
"getPartitioningMeasuresById"
)
}

}

object PartitioningServiceImpl {
Expand Down
Loading

0 comments on commit d31eded

Please sign in to comment.