From 45e7578c58f20c0b98d8a52714af8fae3be32c6a Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Tue, 12 Nov 2024 15:48:50 +0200 Subject: [PATCH 1/3] Feature 273 First Commit Feature 273 First Commit by Liam Leibrandt --- .github/CODEOWNERS | 2 +- database/README.md | 7 + .../postgres/runs/V0.3.0.1__get_ancestors.sql | 141 +++++ .../runs/GetAncestorsIntegrationTests.scala | 495 ++++++++++++++++++ ...PartitioningMainFlowIntegrationTests.scala | 2 + project/plugins.sbt | 4 +- publish.sbt | 6 + .../scala/za/co/absa/atum/server/Main.scala | 1 + .../controller/PartitioningController.scala | 7 + .../PartitioningControllerImpl.scala | 14 + .../runs/functions/GetAncestors.scala | 86 +++ .../absa/atum/server/api/http/Endpoints.scala | 13 + .../co/absa/atum/server/api/http/Routes.scala | 10 + .../repository/PartitioningRepository.scala | 6 + .../PartitioningRepositoryImpl.scala | 27 + .../api/service/PartitioningService.scala | 6 + .../api/service/PartitioningServiceImpl.scala | 11 + .../za/co/absa/atum/server/api/TestData.scala | 22 + .../PartitioningControllerUnitTests.scala | 26 + .../GetAncestorsIntegrationTests.scala | 47 ++ .../GetAncestorsEndpointV2UnitTests.scala | 118 +++++ .../PartitioningRepositoryUnitTests.scala | 32 ++ .../PartitioningServiceUnitTests.scala | 31 ++ 23 files changed, 1111 insertions(+), 3 deletions(-) create mode 100644 database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cec71281c..b39c52df7 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel +* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel @ABLL526 diff --git a/database/README.md b/database/README.md index ebdbe0801..963a16cdd 100644 --- a/database/README.md +++ b/database/README.md @@ -32,3 +32,10 @@ to remove them or sbt flywayBaseline ``` to set the current state as the baseline. + +```zsh +docker kill atum_db +docker rm atum_db +docker run --name=atum_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=atum_db -p 5432:5432 -d postgres:16 +sbt flywayMigrate +``` \ No newline at end of file diff --git a/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql new file mode 100644 index 000000000..ab63e40e5 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql @@ -0,0 +1,141 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_ancestors( + IN i_partitioning_id BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT ancestor_id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_ancestors(1) +-- Returns Ancestors' partition ID for the given id +-- +-- Parameters: +-- i_partitioning_id - id that we asking the Ancestors for +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- ancestor_id - ID of Ancestor partition +-- partitioning - partitioning data of ancestor +-- author - author of the Ancestor partitioning +-- has_more - Flag indicating if there are more partitionings available + +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + flow_id BIGINT[]; + var BIGINT; + partitionCreateAt TIMESTAMP; + _has_more BOOLEAN; + +BEGIN + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Child Partitioning not found'; + has_more := FALSE; + RETURN NEXT; + RETURN; + END IF; + + SELECT created_at FROM runs.partitionings WHERE id_partitioning = i_partitioning_id INTO partitionCreateAt; + + flow_id := array( + SELECT fk_flow AS flow_id + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_partitioning_id + ); + + IF cardinality(flow_id) = 0 + THEN + status := 41; + status_text := 'Flow not found'; + has_more := FALSE; + RETURN NEXT; + RETURN; + END IF; + + FOREACH var IN ARRAY flow_id LOOP + IF i_limit IS NOT NULL THEN + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = var + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + END LOOP; + + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning AS ancestor_id, + P.partitioning AS partitioning, + P.created_by AS author, + _has_more AS has_more + FROM + runs.partitionings P + INNER JOIN + flows.partitioning_to_flow PF + ON PF.fk_partitioning = P.id_partitioning + INNER JOIN + (SELECT * FROM flows.partitioning_to_flow OLD_PF WHERE OLD_PF.fk_partitioning = i_partitioning_id) PF2 + ON Pf2.fk_flow = PF.fk_flow + WHERE + P.id_partitioning < i_partitioning_id + AND + P.created_at <= partitionCreateAt + GROUP BY P.id_partitioning + ORDER BY + P.id_partitioning, + P.created_at DESC + LIMIT i_limit + OFFSET i_offset; + + IF FOUND THEN + status := 11; + status_text := 'OK'; + ELSE + status := 41; + status_text := 'Ancestor Partitioning not found'; + has_more := FALSE; + END IF; + RETURN NEXT; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala new file mode 100644 index 000000000..ad1848163 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -0,0 +1,495 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class GetAncestorsIntegrationTests extends DBTestSuite { + + private val getAncestorsFn = "runs.get_ancestors" + private val partitioningsTable = "runs.partitionings" + + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValuesMap": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning1 = parse(partitioning1.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValuesMap": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning2 = parse(partitioning2.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning3 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyG", "keyH", "keyI"], + | "keysToValuesMap": { + | "keyG": "valueG", + | "keyH": "valueH", + | "keyI": "valueI" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning3 = parse(partitioning3.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning4 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyJ", "keyK", "keyL"], + | "keysToValuesMap": { + | "keyJ": "valueJ", + | "keyK": "valueK", + | "keyL": "valueL" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning4 = parse(partitioning4.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning5 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyM", "keyN", "keyO"], + | "keysToValuesMap": { + | "keyM": "valueM", + | "keyN": "valueN", + | "keyO": "valueO" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning5 = parse(partitioning5.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning6 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyP", "keyQ", "keyR"], + | "keysToValuesMap": { + | "keyP": "valueP", + | "keyQ": "valueQ", + | "keyR": "valueR" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning6 = parse(partitioning6.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning7 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyS", "keyT", "keyU"], + | "keysToValuesMap": { + | "keyS": "valueS", + | "keyT": "valueT", + | "keyU": "valueU" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning7 = parse(partitioning7.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning8 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyV", "keyW", "keyX"], + | "keysToValuesMap": { + | "keyV": "valueV", + | "keyW": "valueW", + | "keyX": "valueX" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns Ancestors for a given Partition ID") { + + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa")) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father")) + table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Son")) + table(partitioningsTable).insert(add("partitioning", partitioning4).add("created_by", "Grandson")) + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "Grandma")) + table(partitioningsTable).insert(add("partitioning", partitioning6).add("created_by", "Mother")) + table(partitioningsTable).insert(add("partitioning", partitioning7).add("created_by", "Daughter")) + table(partitioningsTable).insert(add("partitioning", partitioning8).add("created_by", "Granddaughter")) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + val partId3: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning3, "id_partitioning").get.get + + val partId4: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning4, "id_partitioning").get.get + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + val partId6: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning6, "id_partitioning").get.get + + val partId7: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning7, "id_partitioning").get.get + + val partId8: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning8, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Father") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId6) + .setParam("i_by_user", "Daughter") + .execute { queryResult => + flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1) + .setParam("i_fk_partitioning", partId3) + .setParam("i_by_user", "Son") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId2) + .setParam("i_fk_partitioning", partId4) + .setParam("i_by_user", "Grandson") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId6) + .setParam("i_fk_partitioning", partId7) + .setParam("i_by_user", "GrandDaughter") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId3) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId4) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId5) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId7) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + //TEST 1 Ancestors Partition + function(getAncestorsFn) + .setParam("i_partitioning_id", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + } + + //TEST multiple Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + } + + //TEST Separate flow for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId7) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + } + + //TEST ALL flows for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_partitioning_id", partId8) + .setParam("i_limit", 10) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId5)) + assert(returnedPartitioningParsed == expectedPartitioning5) + assert(row.getString("author").contains("Grandma")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId7)) + assert(returnedPartitioningParsed == expectedPartitioning7) + assert(row.getString("author").contains("Daughter")) + } + } + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + function(getAncestorsFn) + .setParam("i_partitioning_id", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Child Partitioning not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Flow not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_FLOW")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Flow not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Ancestor Partitioning not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_Ancestor")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(getAncestorsFn) + .setParam("i_partitioning_id", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Ancestor Partitioning not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } +} diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala index 2042cf146..3df4f733d 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningMainFlowIntegrationTests.scala @@ -127,6 +127,8 @@ class GetPartitioningMainFlowIntegrationTests extends DBTestSuite { assert(row.getLong("id_flow").contains(1L)) assert(row.getString("flow_name").contains("Flow1")) assert(row.getString("created_by").contains("Sirius")) + print(row.getString("flow_name")) + assert(!queryResult.hasNext) } diff --git a/project/plugins.sbt b/project/plugins.sbt index 801174158..8a3070155 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -33,7 +33,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0") lazy val ow2Version = "9.5" lazy val jacocoVersion = "0.8.11-absa.1" -def jacocoUrl(artifactName: String): String = s"https://github.com/AbsaOSS/jacoco/releases/download/$jacocoVersion/org.jacoco.$artifactName-$jacocoVersion.jar" +def jacocoUrl(artifactName: String): String = s"file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/org.jacoco.$artifactName-$jacocoVersion.jar" def ow2Url(artifactName: String): String = s"https://repo1.maven.org/maven2/org/ow2/asm/$artifactName/$ow2Version/$artifactName-$ow2Version.jar" addSbtPlugin("com.jsuereth" %% "scala-arm" % "2.0" from "https://repo1.maven.org/maven2/com/jsuereth/scala-arm_2.11/2.0/scala-arm_2.11-2.0.jar") @@ -46,4 +46,4 @@ addSbtPlugin("org.ow2.asm" % "asm" % ow2Version from ow2Url("asm")) addSbtPlugin("org.ow2.asm" % "asm-commons" % ow2Version from ow2Url("asm-commons")) addSbtPlugin("org.ow2.asm" % "asm-tree" % ow2Version from ow2Url("asm-tree")) -addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "https://github.com/AbsaOSS/sbt-jacoco/releases/download/3.4.1-absa.4/sbt-jacoco-3.4.1-absa.4.jar") +addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/sbt-jacoco-3.4.1-absa.3.jar") diff --git a/publish.sbt b/publish.sbt index 486192d52..9bd2dadf2 100644 --- a/publish.sbt +++ b/publish.sbt @@ -58,6 +58,12 @@ ThisBuild / developers := List( email = "tebalelo.sekhula@absa.africa", url = url("https://github.com/TebaleloS") ), + Developer( + id = "ABLL526", + name = "Liam Leibrandt", + email = "liam.leibrandt@absa.africa", + url = url("https://github.com/ABLL526") + ), Developer( id = "salamonpavel", name = "Pavel Salamon", diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index c4ac42381..548bbad26 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -66,6 +66,7 @@ object Main extends ZIOAppDefault with Server { GetPartitioning.layer, GetFlowPartitionings.layer, GetPartitioningMainFlow.layer, + GetAncestors.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 059b52bec..e82e52151 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -60,4 +60,11 @@ trait PartitioningController { def getPartitioningMainFlow( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[FlowDTO]] + + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 33c842710..7cc4e0151 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -145,6 +145,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getAncestors(partitioningId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala new file mode 100644 index 000000000..d0790c3b1 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningWithIdDTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ +import za.co.absa.atum.server.model.PartitioningForDB +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} + +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +import scala.annotation.tailrec + +class GetAncestors(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetAncestorsArgs, Option[ + GetAncestorsResult + ], Task](args => + Seq( + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ancestor_id", "partitioning", "author", "has_more") +} + +object GetAncestors { + case class GetAncestorsArgs(partitioningId: Long, limit: Option[Int], offset: Option[Long]) + case class GetAncestorsResult(ancestor_id: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetAncestorsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetAncestorsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningForDB] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningForDB) => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.ancestor_id, partitioningDTO, head.author)) + } + } + } + + } + + val layer: URLayer[PostgresDatabaseProvider, GetAncestors] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetAncestors()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 66a4efad9..0b4c726f6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -173,6 +173,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(errorInDataOneOfVariant) } + protected val getAncestorsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Partitionings) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 825e3ce1f..0069a7b0e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -103,6 +103,16 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getFlowPartitionings(flowId, limit, offset) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getAncestorsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit, offset) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 45023a4e7..ecd109dab 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -57,5 +57,11 @@ trait PartitioningRepository { offset: Option[Long] ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 1f2505375..26d9f8507 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.flows.functions._ @@ -39,6 +40,7 @@ class PartitioningRepositoryImpl ( getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, getPartitioningFn: GetPartitioning, getFlowPartitioningsFn: GetFlowPartitionings, + getAncestorsFn: GetAncestors, getPartitioningMainFlowFn: GetPartitioningMainFlow ) extends PartitioningRepository with BaseRepository { @@ -158,6 +160,28 @@ class PartitioningRepositoryImpl ( } } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getAncestorsFn(GetAncestorsArgs(partitioningId, limit, offset)), + "getAncestors" + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetAncestorsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } + } + override def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] = { dbSingleResultCallWithStatus( getPartitioningMainFlowFn(partitioningId), @@ -182,6 +206,7 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasuresById with GetPartitioning with GetFlowPartitionings + with GetAncestors with GetPartitioningMainFlow, PartitioningRepository ] = ZLayer { @@ -197,6 +222,7 @@ object PartitioningRepositoryImpl { getPartitioning <- ZIO.service[GetPartitioning] getFlowPartitionings <- ZIO.service[GetFlowPartitionings] getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow] + getAncestors <- ZIO.service[GetAncestors] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -208,6 +234,7 @@ object PartitioningRepositoryImpl { getPartitioningMeasuresById, getPartitioning, getFlowPartitionings, + getAncestors, getPartitioningMainFlow ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 78aeb4ad2..f583e1e83 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -56,4 +56,10 @@ trait PartitioningService { ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] def getPartitioningMainFlow(partitioningId: Long): IO[ServiceError, FlowDTO] + + def getAncestors( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 3d9a46c26..5e16d4e6b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -113,6 +113,17 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getAncestors(partitioningId, limit, offset), + "getAncestors" + ) + } + } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 7ac7ed79a..436ccd5fe 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -22,6 +22,7 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.{ResultValueType, dto} import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsResult import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, MeasureFromDB, PartitioningFromDB} import java.time.ZonedDateTime @@ -85,6 +86,27 @@ trait TestData { hasMore = true ) + protected val getAncestorsResult1: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getAncestorsResult2: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + + protected val getAncestorsResult3: GetAncestorsResult = GetAncestorsResult( + ancestor_id = 3333L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index de977bf87..8f33408e0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -92,6 +92,13 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioningMainFlow(999L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Child Partitioning not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -231,6 +238,25 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getAncestors(1111L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getAncestors(8888L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getAncestors(9999L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } + ), suite("GetPartitioningMainFlowSuite")( test("Returns expected FlowDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala new file mode 100644 index 000000000..631fe095b --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetAncestorsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetAncestorsIntegrationTests")( + test("Retrieve Ancestors' partitions for a given id") { + val partitioningID: Long = 1111L + for { + getAncestors <- ZIO.service[GetAncestors] + result <- getAncestors(GetAncestorsArgs(partitioningID, None, None)) + + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + } + ) + }.provide( + GetAncestors.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala new file mode 100644 index 000000000..714ac4ac6 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetAncestorsEndpointV2UnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("Child Partitioning not found") + ) + ) + when(partitioningControllerMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getAncestorsServerEndpoint = + getAncestorsEndpointV2.zServerLogic({ case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit: Option[Int], offset: Option[Long]) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getAncestorsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetAncestorsEndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1111/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/8888/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/9999/partitionings?limit=1&offset=0") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 9ee0e0c21..b3a0b43e9 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -21,6 +21,8 @@ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, Part import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.{GetAncestorsArgs, GetAncestorsResult} import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -115,6 +117,18 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningByIdByIdMockLayer = ZLayer.succeed(getPartitioningByIdByIdMock) + // Get Ancestors By Id Mocks + private val getAncestorsMock = mock(classOf[GetAncestors]) + + when(getAncestorsMock.apply(GetAncestorsArgs(2222L, Some(1), Some(1L))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) + when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(1), Some(1L)))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(1), Some(1L))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) + // GetPartitioningAdditionalDataV2 private val getPartitioningAdditionalDataV2Mock = mock(classOf[GetPartitioningAdditionalDataV2]) @@ -315,6 +329,23 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(2222L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(9999L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(8888L, Some(1), Some(1L)).exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Seq") { for { @@ -407,6 +438,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, getPartitioningByIdByIdMockLayer, + getAncestorsMockLayer, getPartitioningAdditionalDataV2MockLayer, getPartitioningMockLayer, getPartitioningMeasuresV2MockLayer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index fb2e02559..965be3fa1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -73,6 +73,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningById(8888L)) .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + when(partitioningRepositoryMock.getAncestors(1111L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(2222L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(8888L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getAncestors(9999L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Child Partitioning not found"))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(1L)) .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) when(partitioningRepositoryMock.getPartitioningMeasuresById(2L)) @@ -226,6 +235,28 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(1111L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(2222L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getAncestors(8888L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when child partition does not exist") { + assertZIO(PartitioningService.getAncestors(9999L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } + ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Right with Seq[MeasureDTO]") { for { From 97c45811f28484b8c53782f6dbdfcc7882be7d22 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Wed, 13 Nov 2024 15:49:38 +0200 Subject: [PATCH 2/3] Updated Commit Updated commit, updated tests. --- .../co/absa/atum/server/api/http/Routes.scala | 36 +++-- .../za/co/absa/atum/server/api/TestData.scala | 9 +- .../GetAncestorsEndpointV2UnitTests.scala | 4 +- .../PartitioningRepositoryUnitTests.scala | 123 ++++++++---------- 4 files changed, 80 insertions(+), 92 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 0069a7b0e..d1ceba6a0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -25,10 +25,10 @@ import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} -import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse._ +import za.co.absa.atum.model.envelopes.SuccessResponse._ import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -57,7 +57,7 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(postPartitioningEndpointV2, PartitioningController.postPartitioning), createServerEndpoint( getPartitioningAdditionalDataEndpointV2, - PartitioningController.getPartitioningAdditionalDataV2 + PartitioningController.getPartitioningAdditionalData ), createServerEndpoint[ (Long, AdditionalDataPatchDTO), @@ -90,6 +90,14 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ](getFlowCheckpointsEndpointV2, { + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }), createServerEndpoint(getPartitioningByIdEndpointV2, PartitioningController.getPartitioningByIdV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), createServerEndpoint(getPartitioningMainFlowEndpointV2, PartitioningController.getPartitioningMainFlow), @@ -113,7 +121,7 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getAncestors(partitioningId, limit, offset) } ), - createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) + createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.succeed(StatusResponse.up)) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes } @@ -127,14 +135,16 @@ trait Routes extends Endpoints with ServerOptions { // postCheckpointEndpointV2, createPartitioningEndpointV1, // postPartitioningEndpointV2, - // patchPartitioningAdditionalDataEndpointV2, + patchPartitioningAdditionalDataEndpointV2, // getPartitioningCheckpointsEndpointV2, // getPartitioningCheckpointEndpointV2, // getPartitioningMeasuresEndpointV2, - // getPartitioningEndpointV2, + getPartitioningEndpointV2, // getPartitioningMeasuresEndpointV2, // getFlowPartitioningsEndpointV2, - // getPartitioningMainFlowEndpointV2 + // getPartitioningMainFlowEndpointV2, + // getFlowCheckpointsEndpointV2, + healthEndpoint ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, "Atum API", "1.0")) @@ -149,16 +159,16 @@ trait Routes extends Endpoints with ServerOptions { } private def createServerEndpoint[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any], - logic: I => ZIO[HttpEnv.Env, E, O] - ): ZServerEndpoint[HttpEnv.Env, Any] = { + endpoint: PublicEndpoint[I, E, O, Any], + logic: I => ZIO[HttpEnv.Env, E, O] + ): ZServerEndpoint[HttpEnv.Env, Any] = { endpoint.zServerLogic(logic).widen[HttpEnv.Env] } protected def allRoutes( - httpMonitoringConfig: HttpMonitoringConfig, - jvmMonitoringConfig: JvmMonitoringConfig - ): HttpRoutes[HttpEnv.F] = { + httpMonitoringConfig: HttpMonitoringConfig, + jvmMonitoringConfig: JvmMonitoringConfig + ): HttpRoutes[HttpEnv.F] = { createAllServerRoutes(httpMonitoringConfig) <+> createSwaggerRoutes <+> (if (httpMonitoringConfig.enabled) http4sMetricsRoutes else HttpRoutes.empty[HttpEnv.F]) <+> diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 6a7050e98..33f88ce55 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -92,7 +92,7 @@ trait TestData { ) protected val getAncestorsResult1: GetAncestorsResult = GetAncestorsResult( - ancestor_id = 1111L, + ancestor_id = 1L, partitioningJson = partitioningAsJson, author = "author", hasMore = false @@ -105,13 +105,6 @@ trait TestData { hasMore = true ) - protected val getAncestorsResult3: GetAncestorsResult = GetAncestorsResult( - ancestor_id = 3333L, - partitioningJson = partitioningAsJson, - author = "author", - hasMore = false - ) - // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala index 714ac4ac6..d467befd6 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala @@ -24,10 +24,10 @@ import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.model.envelopes.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.PartitioningController -import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} -import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import zio.test.Assertion.equalTo import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} import zio.{Scope, ZIO, ZLayer} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index b3a0b43e9..104fdc460 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -22,7 +22,7 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.GetAncestors -import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.{GetAncestorsArgs, GetAncestorsResult} +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -34,7 +34,7 @@ import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, PartitioningForDB} +import za.co.absa.atum.server.model.{AdditionalDataItemFromDB, PartitioningForDB} object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -96,19 +96,10 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresMockLayer = ZLayer.succeed(getPartitioningMeasuresMock) - // Get Partitioning Additional Data Mocks - private val getPartitioningAdditionalDataMock = mock(classOf[GetPartitioningAdditionalData]) - - when(getPartitioningAdditionalDataMock.apply(partitioningDTO1)) - .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), AdditionalDataFromDB(Some("key"), Some("value")))))) - when(getPartitioningAdditionalDataMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(new Exception("boom!"))) - - private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) - // Get Partitioning By Id Mocks private val getPartitioningByIdByIdMock = mock(classOf[GetPartitioningById]) - when(getPartitioningByIdByIdMock.apply(1111L)) + when(getPartitioningByIdByIdMock.apply(1L)) .thenReturn(ZIO.right(Row(FunctionStatus(11, "OK"), Some(partitioningFromDB1)))) when(getPartitioningByIdByIdMock.apply(9999L)) .thenReturn(ZIO.fail(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) @@ -117,30 +108,18 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningByIdByIdMockLayer = ZLayer.succeed(getPartitioningByIdByIdMock) - // Get Ancestors By Id Mocks - private val getAncestorsMock = mock(classOf[GetAncestors]) - - when(getAncestorsMock.apply(GetAncestorsArgs(2222L, Some(1), Some(1L))) - ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) - when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(1), Some(1L)))) - .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) - when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(1), Some(1L))) - ).thenReturn(ZIO.fail(new Exception("boom!"))) - - private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) - - // GetPartitioningAdditionalDataV2 - private val getPartitioningAdditionalDataV2Mock = mock(classOf[GetPartitioningAdditionalDataV2]) + // GetPartitioningAdditionalData + private val getPartitioningAdditionalDataMock = mock(classOf[GetPartitioningAdditionalData]) - when(getPartitioningAdditionalDataV2Mock.apply(1L)).thenReturn( + when(getPartitioningAdditionalDataMock.apply(1L)).thenReturn( ZIO.right(Seq(Row(FunctionStatus(0, "success"), Some(AdditionalDataItemFromDB("key", Some("value"), "author"))))) ) - when(getPartitioningAdditionalDataV2Mock.apply(2L)).thenReturn(ZIO.fail(new Exception("boom!"))) - when(getPartitioningAdditionalDataV2Mock.apply(3L)).thenReturn( + when(getPartitioningAdditionalDataMock.apply(2L)).thenReturn(ZIO.fail(new Exception("boom!"))) + when(getPartitioningAdditionalDataMock.apply(3L)).thenReturn( ZIO.left(DataNotFoundException(FunctionStatus(41, "not found"))) ) - private val getPartitioningAdditionalDataV2MockLayer = ZLayer.succeed(getPartitioningAdditionalDataV2Mock) + private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) // GetPartitioningMeasuresById private val getPartitioningMeasuresV2Mock = mock(classOf[GetPartitioningMeasuresById]) @@ -170,16 +149,30 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) - ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) - ).thenReturn(ZIO.fail(new Exception("boom!"))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + // Get Ancestors By Id Mocks + private val getAncestorsMock = mock(classOf[GetAncestors]) + + when(getAncestorsMock.apply(GetAncestorsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) + when(getAncestorsMock.apply(GetAncestorsArgs(1111L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult2))))) + when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(10), Some(0))) + ).thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) + // Create Partitioning Mocks private val getPartitioningMainFlowMock = mock(classOf[GetPartitioningMainFlow]) @@ -278,32 +271,20 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { } ), suite("GetPartitioningAdditionalDataSuite")( - test("Returns expected Right with Map") { - for { - result <- PartitioningRepository.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue(result.get("key").contains(Some("value")) && result.size == 1) - }, - test("Returns expected Left with DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalData(partitioningDTO2).exit)( - failsWithA[DatabaseError] - ) - } - ), - suite("GetPartitioningAdditionalDataV2Suite")( test("Returns expected AdditionalDataDTO instance") { for { - result <- PartitioningRepository.getPartitioningAdditionalDataV2(1L) + result <- PartitioningRepository.getPartitioningAdditionalData(1L) } yield assertTrue( result == AdditionalDataDTO(Map.from(Seq("key" -> Some(AdditionalDataItemDTO(Some("value"), "author"))))) ) }, test("Returns expected DatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalDataV2(2L).exit)( + assertZIO(PartitioningRepository.getPartitioningAdditionalData(2L).exit)( failsWithA[GeneralDatabaseError] ) }, test("Returns expected NotFoundDatabaseError") { - assertZIO(PartitioningRepository.getPartitioningAdditionalDataV2(3L).exit)( + assertZIO(PartitioningRepository.getPartitioningAdditionalData(3L).exit)( failsWithA[NotFoundDatabaseError] ) } @@ -311,7 +292,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { suite("GetPartitioningByIdSuite")( test("GetPartitioningById - Returns expected PartitioningWithIdDTO") { for { - result <- PartitioningRepository.getPartitioningById(1111L) + result <- PartitioningRepository.getPartitioningById(1L) } yield assertTrue(result == partitioningWithIdDTO1) }, test("GetPartitioningById - Returns expected DataNotFoundException") { @@ -329,23 +310,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetAncestorsSuite")( - test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { - for { - result <- PartitioningRepository.getAncestors(2222L, Some(1), Some(1L)) - } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) - }, - test("Returns expected NotFoundDatabaseError") { - assertZIO(PartitioningRepository.getAncestors(9999L, Some(1), Some(1L)).exit)( - failsWithA[NotFoundDatabaseError] - ) - }, - test("Returns expected GeneralDatabaseError") { - assertZIO(PartitioningRepository.getAncestors(8888L, Some(1), Some(1L)).exit)( - failsWithA[GeneralDatabaseError] - ) - } - ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Seq") { for { @@ -399,7 +363,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { for { result <- PartitioningRepository.getFlowPartitionings(2L, Some(10), Some(0)) - } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO2))) }, test("Returns expected NotFoundDatabaseError") { assertZIO(PartitioningRepository.getFlowPartitionings(0L, None, None).exit)( @@ -412,6 +376,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1111L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO2))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(9999L, Some(10), Some(0)).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(8888L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetPartitioningSuite")( test("GetPartitioning - Returns expected PartitioningWithIdDTO") { for { @@ -435,14 +421,13 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { createPartitioningIfNotExistsMockLayer, createPartitioningMockLayer, getPartitioningMeasuresMockLayer, - getPartitioningAdditionalDataMockLayer, createOrUpdateAdditionalDataMockLayer, getPartitioningByIdByIdMockLayer, - getAncestorsMockLayer, - getPartitioningAdditionalDataV2MockLayer, + getPartitioningAdditionalDataMockLayer, getPartitioningMockLayer, getPartitioningMeasuresV2MockLayer, getFlowPartitioningsMockLayer, + getAncestorsMockLayer, getPartitioningMainFlowMockLayer ) From e89a3303c959f2d93f0abc47902c7df547386532 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Thu, 28 Nov 2024 14:04:14 +0200 Subject: [PATCH 3/3] Added the patch parents functionality. Amended get ancestors functionality. --- .../V0.3.0.2__patch_partitioning_parent.sql | 98 ++++++ .../runs/GetAncestorsIntegrationTests.scala | 108 +++++++ .../runs/UpdateParentIntegrationTests.scala | 283 ++++++++++++++++++ .../atum/model/dto/ParentPatchV2DTO.scala | 29 ++ .../scala/za/co/absa/atum/server/Main.scala | 1 + .../controller/PartitioningController.scala | 6 + .../PartitioningControllerImpl.scala | 12 + .../functions/PatchPartitioningParent.scala | 47 +++ .../absa/atum/server/api/http/ApiPaths.scala | 1 + .../absa/atum/server/api/http/Endpoints.scala | 12 + .../co/absa/atum/server/api/http/Routes.scala | 12 +- .../repository/PartitioningRepository.scala | 6 + .../PartitioningRepositoryImpl.scala | 25 +- .../api/service/PartitioningService.scala | 6 + .../api/service/PartitioningServiceImpl.scala | 11 + .../za/co/absa/atum/server/api/TestData.scala | 9 + .../PartitioningControllerUnitTests.scala | 26 ++ ...PartitioningParentV2IntegrationTests.scala | 48 +++ ...artitioningParentV2EndpointUnitTests.scala | 101 +++++++ .../PartitioningRepositoryUnitTests.scala | 39 ++- .../PartitioningServiceUnitTests.scala | 24 ++ 21 files changed, 899 insertions(+), 5 deletions(-) create mode 100644 database/src/main/postgres/runs/V0.3.0.2__patch_partitioning_parent.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/UpdateParentIntegrationTests.scala create mode 100644 model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala diff --git a/database/src/main/postgres/runs/V0.3.0.2__patch_partitioning_parent.sql b/database/src/main/postgres/runs/V0.3.0.2__patch_partitioning_parent.sql new file mode 100644 index 000000000..6f2ac5b29 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.2__patch_partitioning_parent.sql @@ -0,0 +1,98 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.patch_partitioning_parent( + IN i_partitioning_id BIGINT, + IN i_parent_id BIGINT, + IN i_by_user TEXT, + OUT status INTEGER, + OUT status_text TEXT, + OUT parent_id BIGINT +) RETURNS record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.patch_partitioning_parent(1, 2, user_1) +-- Returns Parents' partition ID for the given id. +-- Updates the Parent Partition for a given partition ID. + +-- +-- Parameters: +-- i_partitioning_id - id of the partition to be changed +-- i_parent_id - id of the new parent of the partition, +-- i_by_user - user behind the change +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- parent_id - ID of Parent partition + +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + flow_id BIGINT[]; + mainFlow BIGINT; + var BIGINT; + +BEGIN + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Child Partitioning not found'; + RETURN; + END IF; + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_parent_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Parent Partitioning not found'; + RETURN; + END IF; + + SELECT F.id_flow as mainFlow + FROM runs.get_partitioning_main_flow(i_partitioning_id) AS F + INTO mainFlow; + + flow_id := array( + SELECT fk_flow AS flow_id + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_partitioning_id + AND fk_flow != mainFlow + ); + + FOREACH var IN ARRAY flow_id LOOP + DELETE FROM flows.partitioning_to_flow AS PTF + WHERE PTF.fk_partitioning = i_partitioning_id + AND PTF.fk_flow = var; + END LOOP; + + PERFORM 1 FROM flows._add_to_parent_flows(i_parent_id, i_partitioning_id, i_by_user); + status := 11; + status_text := 'Parent Updated'; + parent_id := i_parent_id; + + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.patch_partitioning_parent(BIGINT, BIGINT, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.patch_partitioning_parent(BIGINT, BIGINT, TEXT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala index ad1848163..f825a68e7 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -24,6 +24,8 @@ class GetAncestorsIntegrationTests extends DBTestSuite { private val getAncestorsFn = "runs.get_ancestors" private val partitioningsTable = "runs.partitionings" + private val updateParentFn = "runs.patch_partitioning_parent" + private val createPartitioningFn = "runs.create_partitioning" private val createFlowFn = "flows._create_flow" private val addToParentFlowsFn = "flows._add_to_parent_flows" @@ -427,6 +429,112 @@ class GetAncestorsIntegrationTests extends DBTestSuite { } } + test("Change in Parent") { + + println("IDS:") + + val partId1 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning1) + .setParam("i_by_user", "GrandPa") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partId2 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning2) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partId3 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning3) + .setParam("i_parent_partitioning_id", partId1) + .setParam("i_by_user", "Father") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + println(partId1) + println(partId2) + println(partId3) + + println("OLD:") + function(getAncestorsFn) + .setParam("i_partitioning_id", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("GrandPa")) + println(partId3) + } + + println("COMPLETE") + println("NEW:") + + function(updateParentFn) + .setParam("i_partitioning_id", partId3) + .setParam("i_parent_id", partId2) + .setParam("i_by_user", "Happy Nappy") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(row.getLong("parent_id").contains(partId2)) + println(partId2) + assert(!queryResult.hasNext) + } + + println("COMPLETE") + println("EXTRA CHECK") + table("flows.partitioning_to_flow").all() { partToFlowResult => + while (partToFlowResult.hasNext) { + val partToFlowRow = partToFlowResult.next() + val result = partToFlowRow.getLong("fk_flow") + val fk_partitioning = partToFlowRow.getLong("fk_partitioning") + println() + println("flow: " + result) + println("part ID:" + fk_partitioning) + } + } + + function(getAncestorsFn) + .setParam("i_partitioning_id", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("GrandMa")) + println(partId3) + print(partId2) + } + println("COMPLETE") + } + test("Child Partitioning not found") { val nonExistentID = 9999L diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/UpdateParentIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/UpdateParentIntegrationTests.scala new file mode 100644 index 000000000..24a062b0c --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/UpdateParentIntegrationTests.scala @@ -0,0 +1,283 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class UpdateParentIntegrationTests extends DBTestSuite { + + private val updateParentFn = "runs.patch_partitioning_parent" + private val createPartitioningFn = "runs.create_partitioning" + + private val partitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValuesMap": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValuesMap": { + | "key1": "valueA", + | "key2": "valueB", + | "key3": "valueC", + | "key4": "valueD" + | } + |} + |""".stripMargin + ) + + private val parentPartitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3"], + | "keysToValuesMap": { + | "key1": "valueX", + | "key3": "valueZ" + | } + |} + |""".stripMargin + ) + + private val parentPartitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3"], + | "keysToValuesMap": { + | "key1": "valueW", + | "key3": "valueY" + | } + |} + |""".stripMargin + ) + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_partitioning_id", nonExistentID) + .setParam("i_parent_id", parentPartitioningID) + .setParam("i_by_user", "Fantômas") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Child Partitioning not found")) + assert(row.getJsonB("parent_id").isEmpty) + assert(!queryResult.hasNext) + } + + } + + test("Parent Partitioning not found") { + val nonExistentID = 9999L + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_partitioning_id", partitioningID) + .setParam("i_parent_id", nonExistentID) + .setParam("i_by_user", "Fantômas") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Parent Partitioning not found")) + assert(row.getJsonB("parent_id").isEmpty) + assert(!queryResult.hasNext) + } + + } + + test("Parent Partitioning Updated") { + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val parentPartitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning2) + .setParam("i_by_user", "Tomas Riddle") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_partitioning_id", partitioningID) + .setParam("i_parent_id", parentPartitioningID2) + .setParam("i_by_user", "Happy Nappy") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(row.getLong("parent_id").contains(parentPartitioningID2)) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + + function(updateParentFn) + .setParam("i_partitioning_id", partitioningID) + .setParam("i_parent_id", parentPartitioningID) + .setParam("i_by_user", "Happy Nappy") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(row.getLong("parent_id").contains(parentPartitioningID)) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + + var partitioningID2 = 0L; + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID2)) == 0 + ) + + partitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning2) + .setParam("i_by_user", "Tomas Riddler") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID2)) == 1 + ) + + function(updateParentFn) + .setParam("i_partitioning_id", partitioningID2) + .setParam("i_parent_id", parentPartitioningID) + .setParam("i_by_user", "Happy Nappy") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(row.getLong("parent_id").contains(parentPartitioningID)) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID2)) == 2 + ) + + } +} diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala new file mode 100644 index 000000000..f89ddb81e --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.dto + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +case class ParentPatchV2DTO( + parentPartitioningId: Long, +) + +object ParentPatchV2DTO { + implicit val decoderFlowDTO: Decoder[ParentPatchV2DTO] = deriveDecoder + implicit val encoderFlowDTO: Encoder[ParentPatchV2DTO] = deriveEncoder +} diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index a46604ef4..eaf386a0f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -67,6 +67,7 @@ object Main extends ZIOAppDefault with Server { GetFlowPartitionings.layer, GetPartitioningMainFlow.layer, GetAncestors.layer, + PatchPartitioningParent.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 9e348e5f9..346be0d78 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -32,6 +32,12 @@ trait PartitioningController { partitioningSubmitDTO: PartitioningSubmitV2DTO ): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] + def patchPartitioningParentV2( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[ErrorResponse, SingleSuccessResponse[ParentPatchV2DTO]] + def getPartitioningAdditionalData( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index ac6a1e23c..85548fa07 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -99,6 +99,18 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) } yield (response, uri) } + override def patchPartitioningParentV2( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[ErrorResponse,SingleSuccessResponse[ParentPatchV2DTO]] = { + mapToSingleSuccessResponse( + serviceCall[ParentPatchV2DTO, ParentPatchV2DTO]( + partitioningService.patchPartitioningParent(partitioningId, parentPartitioningID, byUser) + ) + ) + } + override def patchPartitioningAdditionalDataV2( partitioningId: Long, additionalDataPatchDTO: AdditionalDataPatchDTO diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala new file mode 100644 index 000000000..92c91a77e --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import za.co.absa.atum.model.dto.{FlowDTO, ParentPatchV2DTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} + +class PatchPartitioningParent(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieSingleResultFunctionWithStatus[PatchPartitioningParentArgs, Option[ParentPatchV2DTO], Task](input => Seq(fr"$input")) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = + super.fieldsToSelect ++ Seq("parent_id") +} + +object PatchPartitioningParent { + case class PatchPartitioningParentArgs(partitioningId: Long, parentPartitioningID: Long, byUser: String) + val layer: URLayer[PostgresDatabaseProvider, PatchPartitioningParent] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new PatchPartitioningParent()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala index 3e5903082..e751245be 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala @@ -40,6 +40,7 @@ object ApiPaths { final val Flows = "flows" final val Measures = "measures" final val MainFlow = "main-flow" + final val Parents = "parents" } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index f59c95efe..0b774c137 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -76,6 +76,18 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(conflictErrorOneOfVariant) } + protected val patchPartitioningParentEndpointV2 + : PublicEndpoint[(Long, Long, String), ErrorResponse, SingleSuccessResponse[ + ParentPatchV2DTO + ], Any] = { + apiV2.get + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Parents / path[Long]("partitioningId") / V2Paths.Parents ) + .in(query[String]("byUser")) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[SingleSuccessResponse[ParentPatchV2DTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val getPartitioningAdditionalDataEndpointV2 : PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[AdditionalDataDTO], Any] = { apiV2.get diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index d1ceba6a0..05b13f872 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, ParentPatchV2DTO, PartitioningWithIdDTO} import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -121,6 +121,16 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getAncestors(partitioningId, limit, offset) } ), + createServerEndpoint[ + (Long, Long, String), + ErrorResponse, + SingleSuccessResponse[ParentPatchV2DTO] + ]( + patchPartitioningParentEndpointV2, + { case (partitioningId: Long, parentPartitioningId: Long, byUser: String) => + PartitioningController.patchPartitioningParentV2(partitioningId, parentPartitioningId, byUser) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.succeed(StatusResponse.up)) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 2817eed07..741a048a2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -62,4 +62,10 @@ trait PartitioningRepository { ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] + + def patchPartitioningParent( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[DatabaseError, ParentPatchV2DTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index b0ed132e0..85318ca0b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -28,6 +28,7 @@ import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError import PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs class PartitioningRepositoryImpl( createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists, @@ -40,7 +41,8 @@ class PartitioningRepositoryImpl( getPartitioningFn: GetPartitioning, getFlowPartitioningsFn: GetFlowPartitionings, getAncestorsFn: GetAncestors, - getPartitioningMainFlowFn: GetPartitioningMainFlow + getPartitioningMainFlowFn: GetPartitioningMainFlow, + patchPartitioningParentFn: PatchPartitioningParent ) extends PartitioningRepository with BaseRepository { @@ -182,6 +184,20 @@ class PartitioningRepositoryImpl( } } + override def patchPartitioningParent( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[DatabaseError, ParentPatchV2DTO] = { + dbSingleResultCallWithStatus( + patchPartitioningParentFn(PatchPartitioningParentArgs(partitioningId, parentPartitioningID, byUser)), + "patchPartitioningParent" + ).flatMap { + case Some(parentPatchV2DTO) => ZIO.succeed(parentPatchV2DTO) + case None => ZIO.fail(GeneralDatabaseError("Unexpected error.")) + } + } + } object PartitioningRepositoryImpl { @@ -196,7 +212,8 @@ object PartitioningRepositoryImpl { with GetPartitioning with GetFlowPartitionings with GetAncestors - with GetPartitioningMainFlow, + with GetPartitioningMainFlow + with PatchPartitioningParent, PartitioningRepository ] = ZLayer { for { @@ -211,6 +228,7 @@ object PartitioningRepositoryImpl { getFlowPartitionings <- ZIO.service[GetFlowPartitionings] getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow] getAncestors <- ZIO.service[GetAncestors] + patchPartitioningParent <- ZIO.service[PatchPartitioningParent] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -222,7 +240,8 @@ object PartitioningRepositoryImpl { getPartitioning, getFlowPartitionings, getAncestors, - getPartitioningMainFlow + getPartitioningMainFlow, + patchPartitioningParent ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 9c8e13c26..885f9425e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -30,6 +30,12 @@ trait PartitioningService { partitioningSubmitDTO: PartitioningSubmitV2DTO ): IO[ServiceError, PartitioningWithIdDTO] + def patchPartitioningParent( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[ServiceError, ParentPatchV2DTO] + def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] def getPartitioningAdditionalData(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 26a999643..a8a691071 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -42,6 +42,17 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def patchPartitioningParent( + partitioningId: Long, + parentPartitioningID: Long, + byUser: String + ): IO[ServiceError, ParentPatchV2DTO] = { + repositoryCall( + partitioningRepository.patchPartitioningParent(partitioningId, parentPartitioningID, byUser), + "patchPartitioningParent" + ) + } + override def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] = { repositoryCall( partitioningRepository.getPartitioningMeasures(partitioning), diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 33f88ce55..c51a6cb6b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -136,6 +136,15 @@ trait TestData { protected val partitioningSubmitV2DTO3: PartitioningSubmitV2DTO = partitioningSubmitV2DTO1.copy(author = "yetAnotherAuthor") + + protected val parentPatchV2DTO01: ParentPatchV2DTO = ParentPatchV2DTO( + parentPartitioningId = 1111L + ) + + protected val parentPatchV2DTO02: ParentPatchV2DTO = ParentPatchV2DTO( + parentPartitioningId = 2222L + ) + // Flow protected val flowDTO1: FlowDTO = FlowDTO( id = 1L, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index b1c548e2d..71d080f58 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -90,6 +90,13 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioningMainFlow(999L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.patchPartitioningParent(111L, 1111L, "Jack")) + .thenReturn(ZIO.succeed(parentPatchV2DTO01)) + when(partitioningServiceMock.patchPartitioningParent(222L, 2222L, "Jill")) + .thenReturn(ZIO.fail(NotFoundServiceError("not found"))) + when(partitioningServiceMock.patchPartitioningParent(999L, 2222L, "Bean")) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(1111L, Some(1), Some(0))) .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) when(partitioningServiceMock.getAncestors(8888L, Some(1), Some(0))) @@ -273,6 +280,25 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { failsWithA[InternalServerErrorResponse] ) } + ), + suite("PatchParentSuite")( + test("Returns expected ParentPatchV2DTO") { + for { + result <- PartitioningController.patchPartitioningParentV2(111L, 1111L, "Jack") + expected = SingleSuccessResponse(parentPatchV2DTO01, uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected NotFoundErrorResponse") { + assertZIO(PartitioningController.patchPartitioningParentV2(222L, 2222L, "Jill").exit)( + failsWithA[NotFoundErrorResponse] + ) + }, + test("Returns expected InternalServerErrorResponse") { + assertZIO(PartitioningController.patchPartitioningParentV2(999L, 2222L, "Bean").exit)( + failsWithA[InternalServerErrorResponse] + ) + } ) ).provide( PartitioningControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala new file mode 100644 index 000000000..127bfbfd3 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio._ +import zio.interop.catz.asyncInstance +import zio.test._ + +object PatchPartitioningParentV2IntegrationTests extends ConfigProviderTest { + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("PatchPartitioningParentSuite")( + test("Returns expected PatchParentV2DTO with provided partitioning") { + val partitioningId: Long = 999L + val parentPartitioningId: Long = 1111L + val byUser: String = "Jack" + for { + patchPartitioningParent <- ZIO.service[PatchPartitioningParent] + result <- patchPartitioningParent(PatchPartitioningParentArgs(partitioningId, parentPartitioningId, byUser)) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + } + ).provide( + PatchPartitioningParent.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + } +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala new file mode 100644 index 000000000..f998ad3fc --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.{FlowDTO, ParentPatchV2DTO} +import za.co.absa.atum.model.envelopes.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import zio.test.Assertion.equalTo +import sttp.client3.{UriContext, basicRequest} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object PatchPartitioningParentV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.patchPartitioningParentV2(111L, 1111L, "Jack")) + .thenReturn( + ZIO.succeed( + SingleSuccessResponse(parentPatchV2DTO01, uuid1) + ) + ) + when(partitioningControllerMock.patchPartitioningParentV2(222L, 2222L, "Jack")) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("Child Partitioning not found") + ) + ) + when(partitioningControllerMock.patchPartitioningParentV2(9999L, 9999L, "Bean")) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val patchPartitioningParentServerEndpoint = + patchPartitioningParentEndpointV2.zServerLogic({ case (partitioningId: Long, parentPartitioningId: Long, byUser: String) => + PartitioningController.patchPartitioningParentV2(partitioningId, parentPartitioningId: Long, byUser: String) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(patchPartitioningParentServerEndpoint) + .thenRunLogic() + .backend() + + suite("PatchPartitioningParentSuite")( + test("Returns an expected ParentPatchV2DTO") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/111/parents/1111/parents?byUser=Jack") + .response(asJson[SingleSuccessResponse[ParentPatchV2DTO]]) + + val response = request.send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo(Right(SingleSuccessResponse(parentPatchV2DTO01, uuid1)), StatusCode.Ok) + ) + }, + test("Returns expected 404 when partitioning not found for a given id") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/222/partitionings?parentPartitioningId=1111L&byUser=Jack") + .response(asJson[SingleSuccessResponse[ParentPatchV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + } + ) + }.provide(partitioningControllerMockLayer) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 104fdc460..6ac684294 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -24,6 +24,7 @@ import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings. import za.co.absa.atum.server.api.database.runs.functions.GetAncestors import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError._ @@ -186,6 +187,19 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMainFlowMockLayer = ZLayer.succeed(getPartitioningMainFlowMock) + // Create Partitioning Mocks + private val patchPartitioningParentMock = mock(classOf[PatchPartitioningParent]) + + when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(111L, 1111L, "Jack"))).thenReturn( + ZIO.right(Row(FunctionStatus(11, "OK"), Some(parentPatchV2DTO01)))) + when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(222L, 2222L, "Jill"))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(333L, 3333L, "Bean"))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Parent Partitioning not found")))) + when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(444L, 1111L, "Bean"))).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + + private val patchPartitioningParentMockLayer = ZLayer.succeed(patchPartitioningParentMock) + override def spec: Spec[TestEnvironment with Scope, Any] = { suite("PartitioningRepositorySuite")( @@ -354,6 +368,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("PatchPartitioningParentSuite")( + test("Returns expected parentPatchV2DTO") { + for { + result <- PartitioningRepository.patchPartitioningParent(111L, 1111L, "Jack") + } yield assertTrue(result == parentPatchV2DTO01) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.patchPartitioningParent(222L, 2222L, "Jill").exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.patchPartitioningParent(333L, 3333L, "Bean").exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.patchPartitioningParent(444L, 1111L, "Bean").exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetFlowPartitioningsSuite")( test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { for { @@ -428,7 +464,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMeasuresV2MockLayer, getFlowPartitioningsMockLayer, getAncestorsMockLayer, - getPartitioningMainFlowMockLayer + getPartitioningMainFlowMockLayer, + patchPartitioningParentMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index e5d4d7ecd..2975b344a 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -107,6 +107,13 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningMainFlow(3L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.patchPartitioningParent(111L, 1111L, "Jack")) + .thenReturn(ZIO.succeed(parentPatchV2DTO01)) + when(partitioningRepositoryMock.patchPartitioningParent(222L, 2222L, "Jill")) + .thenReturn(ZIO.fail(NotFoundDatabaseError("boom!"))) + when(partitioningRepositoryMock.patchPartitioningParent(333L, 3333L, "Bean")) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -293,6 +300,23 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("PatchPartitioningParent")( + test("Returns expected Right with ParentPatchV2DTO") { + for { + result <- PartitioningService.patchPartitioningParent(111L, 1111L, "Jack") + } yield assertTrue(result == parentPatchV2DTO01) + }, + test("Returns expected ServiceError") { + assertZIO(PartitioningService.patchPartitioningParent(222L, 2222L, "Jill").exit)( + failsWithA[NotFoundServiceError] + ) + }, + test("Returns expected ServiceError") { + assertZIO(PartitioningService.patchPartitioningParent(333L, 3333L, "Bean").exit)( + failsWithA[GeneralServiceError] + ) + } + ), suite("GetFlowPartitioningsSuite")( test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { for {