Skip to content

Commit

Permalink
#1692 RunServiceV3 has custom validation - it checks the reference'd …
Browse files Browse the repository at this point in the history
…dataset existence for run (a top of uniqueness) - reflected in IT
  • Loading branch information
dk1844 committed May 30, 2022
1 parent b30646e commit 4d39116
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
def create(newRun: Run, username: String, retriesLeft: Int = 3): Future[Run] = {
for {
latestOpt <- runMongoRepository.getLatestRun(newRun.dataset, newRun.datasetVersion)
run <- getRunIdentifiersIfAbsent(newRun, username, latestOpt)
run <- getRunIdentifiersIfAbsent(newRun, username, latestOpt) // adds uniqueId, replaces runId
validation <- validate(run)
createdRun <-
if (validation.isValid) {
Expand Down Expand Up @@ -203,7 +203,7 @@ class RunService @Autowired()(val mongoRepository: RunMongoRepository)
}
}

private def validateUniqueId(run: Run): Future[Validation] = {
protected def validateUniqueId(run: Run): Future[Validation] = {
val validation = Validation()

run.uniqueId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,33 @@ package za.co.absa.enceladus.rest_api.services.v3

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import za.co.absa.enceladus.model.{Run, Validation}
import za.co.absa.enceladus.rest_api.models.RunSummary
import za.co.absa.enceladus.rest_api.repositories.v3.RunMongoRepositoryV3
import za.co.absa.enceladus.rest_api.services.RunService

import scala.concurrent.Future

@Service
class RunServiceV3 @Autowired()(override val mongoRepository: RunMongoRepositoryV3)
class RunServiceV3 @Autowired()(override val mongoRepository: RunMongoRepositoryV3, datasetServiceV3: DatasetServiceV3)
extends RunService(mongoRepository) {

import scala.concurrent.ExecutionContext.Implicits.global

override def validate(run: Run): Future[Validation] = {
for {
uniqueness <- validateUniqueId(run)
dsExistence <- validateDatasetExists(run.dataset, run.datasetVersion)
} yield uniqueness.merge(dsExistence)
}

protected def validateDatasetExists(datasetName: String, datasetVersion: Int): Future[Validation] = {
datasetServiceV3.getVersion(datasetName, datasetVersion).map {
case None => Validation.empty.withError("dataset", s"Dataset $datasetName v$datasetVersion not found!")
case Some(_) => Validation.empty
}
}

/**
* Yields Latest-of-each run summaries (grouped by datasetName, datasetVersion).
* Optionally filtered by one of `startDate` (>=)|`sparkAppId`(==)|`uniqueId`(==)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.junit4.SpringRunner
import za.co.absa.atum.model.{Checkpoint, ControlMeasure, RunState, RunStatus}
import za.co.absa.atum.utils.SerializationUtils
import za.co.absa.enceladus.model.test.factories.RunFactory
import za.co.absa.enceladus.model.test.factories.{DatasetFactory, RunFactory}
import za.co.absa.enceladus.model.{Run, SplineReference, Validation}
import za.co.absa.enceladus.rest_api.integration.controllers.BaseRestApiTestV3
import za.co.absa.enceladus.rest_api.integration.fixtures.{FixtureService, RunFixtureService}
import za.co.absa.enceladus.rest_api.integration.fixtures.{DatasetFixtureService, FixtureService, RunFixtureService}
import za.co.absa.enceladus.rest_api.models.{RunDatasetNameGroupedSummary, RunDatasetVersionGroupedSummary, RunSummary}

import java.util.UUID
Expand All @@ -37,13 +37,15 @@ import java.util.UUID
@ActiveProfiles(Array("withEmbeddedMongo"))
class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {

import za.co.absa.enceladus.model.Validation._
import za.co.absa.enceladus.rest_api.integration.RunImplicits.RunExtensions

@Autowired
private val runFixture: RunFixtureService = null

override def fixtures: List[FixtureService[_]] = List(runFixture)
@Autowired
private val datasetFixture: DatasetFixtureService = null

override def fixtures: List[FixtureService[_]] = List(runFixture, datasetFixture)

private val apiUrl = "/runs"

Expand Down Expand Up @@ -261,6 +263,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
s"POST $apiUrl/{datasetName}/{datasetVersion}" can {
"return 201" when {
"new Run is created (basic case)" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset")) // dataset ref'd by the run
val run = RunFactory.getDummyRun()

val response = sendPost[Run, Run](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
Expand All @@ -270,6 +273,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
assert(body == run)
}
"created run provides a uniqueId if none is specified" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
val run = RunFactory.getDummyRun(uniqueId = None)
val response = sendPost[Run, Run](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))

Expand All @@ -281,6 +285,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
assert(body == expected)
}
"created run generates a runId=1 for the first run" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
val run = RunFactory.getDummyRun(runId = 123) // specified runId is ignored

val response = sendPost[Run, Run](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
Expand All @@ -291,6 +296,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
assert(body == run.copy(runId = 1)) // no runs present, so runId = 1
}
"created run generates a subsequent runId" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
runFixture.add(
RunFactory.getDummyRun(runId = 1),
RunFactory.getDummyRun(runId = 2)
Expand All @@ -305,6 +311,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
assert(body == run.copy(runId = 3)) // runs 1, and 2 were already presents
}
"handles two runs being started simultaneously" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
val run1 = RunFactory.getDummyRun()
val run2 = RunFactory.getDummyRun()
run1.uniqueId should not be run2.uniqueId
Expand Down Expand Up @@ -348,6 +355,7 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
response2.getBody should include("URL and payload entity version mismatch: 2 != 3")
}
"a Run with the given uniqueId already exists" in {
datasetFixture.add(DatasetFactory.getDummyDataset("dummyDataset"))
val uniqueId = "ed9fd163-f9ac-46f8-9657-a09a4e3fb6e9"
val presentRun = RunFactory.getDummyRun(uniqueId = Option(uniqueId))
runFixture.add(presentRun)
Expand All @@ -361,7 +369,15 @@ class RunControllerV3IntegrationSuite extends BaseRestApiTestV3 with Matchers {
assert(!body.isValid)
assert(body == Validation().withError("uniqueId", s"run with this uniqueId already exists: $uniqueId"))
}
// todo non-existent dataset/version failing
"a Run references a non-existing dataset" in {
// dataset ref'd by the run does not exits
val run = RunFactory.getDummyRun()

val response = sendPost[Run, Validation](s"$apiUrl/dummyDataset/1", bodyOpt = Option(run))
response.getStatusCode shouldBe HttpStatus.BAD_REQUEST

response.getBody shouldBe Validation.empty.withError("dataset", "Dataset dummyDataset v1 not found!")
}
}
}

Expand Down

0 comments on commit 4d39116

Please sign in to comment.