diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index cd103ce..122d07a 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -26,16 +26,17 @@ object KafkaCase { // This goes from your application logic private val sampleMessageToWrite = EdlaChange( - app_id_snow = "N/A", - data_definition_id = "TestingThis", - environment = "DEV", - format = "FooBar", - guid = "DebugId", - location = "ether", - operation = EdlaChange.Operation.Create(), - schema_link = "http://not.here", - source_app = "ThisCode", - timestamp_event = 12345 + event_id = "SampleId", + tenant_id = "NoOneImportant", + source_app = "SampleApp", + source_app_version = "v over 9000", + environment = "Example", + timestamp_event = 12345, + catalog_id = "ExamplesCatalog", + operation = EdlaChange.Operation.Overwrite(), + location = "ScalaSamples", + format = "SampleFormat", + formatOptions = Map("ExtraOption" -> "nope") ) def main(args: Array[String]): Unit = { diff --git a/models/src/main/scala/za/co/absa/kafkacase/models/topics/EdlaChange.scala b/models/src/main/scala/za/co/absa/kafkacase/models/topics/EdlaChange.scala index ded5399..215b518 100644 --- a/models/src/main/scala/za/co/absa/kafkacase/models/topics/EdlaChange.scala +++ b/models/src/main/scala/za/co/absa/kafkacase/models/topics/EdlaChange.scala @@ -21,36 +21,40 @@ import io.circe.generic.JsonCodec @JsonCodec case class EdlaChange( - app_id_snow: String, - data_definition_id: String, + event_id: String, + tenant_id: String, + source_app: String, + source_app_version: String, environment: String, - format: String, - guid: String, - location: String, + timestamp_event: Long, + catalog_id: String, operation: EdlaChange.Operation, - schema_link: String, - source_app: String, - timestamp_event: Long + location: String, + format: String, + formatOptions: Map[String, String] ) object EdlaChange { sealed trait Operation object Operation { - case class Create() extends Operation - case class Update() extends Operation + case class Overwrite() extends Operation + case class Append() extends Operation case class Archive() extends Operation + case class Delete() extends Operation implicit val operationEncoder: Encoder[Operation] = Encoder.encodeString.contramap[Operation] { - case Create() => s"CREATE" - case Update() => s"UPDATE" - case Archive() => s"ARCHIVE" + case Overwrite() => s"overwrite" + case Append() => s"append" + case Archive() => s"archive" + case Delete() => s"delete" } implicit val operationDecoder: Decoder[Operation] = Decoder.decodeString.emap { - case "CREATE" => Right(Create()) - case "UPDATE" => Right(Update()) - case "ARCHIVE" => Right(Archive()) + case "overwrite" => Right(Overwrite()) + case "append" => Right(Append()) + case "archive" => Right(Archive()) + case "delete" => Right(Delete()) } } } diff --git a/models/src/main/scala/za/co/absa/kafkacase/models/topics/SchemaRun.scala b/models/src/main/scala/za/co/absa/kafkacase/models/topics/Run.scala similarity index 60% rename from models/src/main/scala/za/co/absa/kafkacase/models/topics/SchemaRun.scala rename to models/src/main/scala/za/co/absa/kafkacase/models/topics/Run.scala index f1ab0d8..ba3b649 100644 --- a/models/src/main/scala/za/co/absa/kafkacase/models/topics/SchemaRun.scala +++ b/models/src/main/scala/za/co/absa/kafkacase/models/topics/Run.scala @@ -18,39 +18,51 @@ package za.co.absa.kafkacase.models.topics import io.circe.{Decoder, Encoder} import io.circe.generic.JsonCodec +import za.co.absa.kafkacase.models.topics.Run.Job @JsonCodec -case class SchemaRun( - app_id_snow: String, - data_definition_id: String, - environment: String, - guid: String, +case class Run( + event_id: String, job_ref: String, - message: String, + tenant_id: String, source_app: String, - status: SchemaRun.Status, + source_app_version: String, + environment: String, + timestamp_start: Long, timestamp_end: Long, - timestamp_start: Long + jobs: Seq[Job] ) -object SchemaRun { +object Run { + @JsonCodec + case class Job( + catalog_id: String, + status: Status, + timestamp_start: Long, + timestamp_end: Long, + message: String + ) + sealed trait Status object Status { - case class Finished() extends Status + case class Succeeded() extends Status case class Failed() extends Status case class Killed() extends Status + case class Skipped() extends Status implicit val operationEncoder: Encoder[Status] = Encoder.encodeString.contramap[Status] { - case Finished() => s"Finished" - case Failed() => s"Failed" - case Killed() => s"Killed" + case Succeeded() => s"succeeded" + case Failed() => s"failed" + case Killed() => s"killed" + case Skipped() => s"skipped" } implicit val operationDecoder: Decoder[Status] = Decoder.decodeString.emap { - case "Finished" => Right(Finished()) - case "Failed" => Right(Failed()) - case "Killed" => Right(Killed()) + case "succeeded" => Right(Succeeded()) + case "failed" => Right(Failed()) + case "killed" => Right(Killed()) + case "skipped" => Right(Skipped()) } } } diff --git a/models/src/test/scala/za/co/absa/kafkacase/models/topics/EdlaChangeUnitTests.scala b/models/src/test/scala/za/co/absa/kafkacase/models/topics/EdlaChangeUnitTests.scala index 341c786..7897831 100644 --- a/models/src/test/scala/za/co/absa/kafkacase/models/topics/EdlaChangeUnitTests.scala +++ b/models/src/test/scala/za/co/absa/kafkacase/models/topics/EdlaChangeUnitTests.scala @@ -22,30 +22,34 @@ import org.scalatest.funsuite.AnyFunSuite class EdlaChangeUnitTests extends AnyFunSuite { private val instance = EdlaChange( - app_id_snow = "N/A", - data_definition_id = "TestingThis", - environment = "DEV", - format = "FooBar", - guid = "DebugId", - location = "ether", - operation = EdlaChange.Operation.Create(), - schema_link = "http://not.here", - source_app = "ThisCode", - timestamp_event = 12345 + event_id = "TestEventId", + tenant_id = "TestTenantId", + source_app = "TestSrc", + source_app_version = "v9000", + environment = "UnitTestEnv", + timestamp_event = 12345, + catalog_id = "TestCatalog", + operation = EdlaChange.Operation.Delete(), + location = "UnitTest", + format = "TestFormat", + formatOptions = Map("Foo" -> "Bar") ) private val json = """{ - | "app_id_snow" : "N/A", - | "data_definition_id" : "TestingThis", - | "environment" : "DEV", - | "format" : "FooBar", - | "guid" : "DebugId", - | "location" : "ether", - | "operation" : "CREATE", - | "schema_link" : "http://not.here", - | "source_app" : "ThisCode", - | "timestamp_event" : 12345 + | "event_id" : "TestEventId", + | "tenant_id" : "TestTenantId", + | "source_app" : "TestSrc", + | "source_app_version" : "v9000", + | "environment" : "UnitTestEnv", + | "timestamp_event" : 12345, + | "catalog_id" : "TestCatalog", + | "operation" : "delete", + | "location" : "UnitTest", + | "format" : "TestFormat", + | "formatOptions" : { + | "Foo" : "Bar" + | } |}""".stripMargin test("Serializes to JSON properly") { diff --git a/models/src/test/scala/za/co/absa/kafkacase/models/topics/RunUnitTests.scala b/models/src/test/scala/za/co/absa/kafkacase/models/topics/RunUnitTests.scala new file mode 100644 index 0000000..54c607a --- /dev/null +++ b/models/src/test/scala/za/co/absa/kafkacase/models/topics/RunUnitTests.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.models.topics + +import io.circe.jawn.decode +import io.circe.syntax.EncoderOps +import org.scalatest.funsuite.AnyFunSuite + +class RunUnitTests extends AnyFunSuite { + private val instance = Run( + event_id = "TestId", + job_ref = "TestJob", + tenant_id = "TestTenant", + source_app = "UnitTestSrc", + source_app_version = "v9000", + environment = "UnitTestEnv", + timestamp_start = 12345, + timestamp_end = 6789, + jobs = Seq(Run.Job( + catalog_id = "FooId", + status = Run.Status.Killed(), + timestamp_start = 12346, + timestamp_end = 6788, + message = "TestingLikeCrazy" + )) + ) + + private val json = + """{ + | "event_id" : "TestId", + | "job_ref" : "TestJob", + | "tenant_id" : "TestTenant", + | "source_app" : "UnitTestSrc", + | "source_app_version" : "v9000", + | "environment" : "UnitTestEnv", + | "timestamp_start" : 12345, + | "timestamp_end" : 6789, + | "jobs" : [ + | { + | "catalog_id" : "FooId", + | "status" : "killed", + | "timestamp_start" : 12346, + | "timestamp_end" : 6788, + | "message" : "TestingLikeCrazy" + | } + | ] + |}""".stripMargin + + test("Serializes to JSON properly") { + assertResult(json)(instance.asJson.toString()) + } + + test("Deserializes from JSON properly") { + assertResult(instance)(decode[Run](json).getOrElse(throw new Exception("Failed to parse JSON"))) + } +} diff --git a/models/src/test/scala/za/co/absa/kafkacase/models/topics/SchemaRunUnitTests.scala b/models/src/test/scala/za/co/absa/kafkacase/models/topics/SchemaRunUnitTests.scala deleted file mode 100644 index 86a3e6b..0000000 --- a/models/src/test/scala/za/co/absa/kafkacase/models/topics/SchemaRunUnitTests.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2024 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.kafkacase.models.topics - -import io.circe.jawn.decode -import io.circe.syntax.EncoderOps -import org.scalatest.funsuite.AnyFunSuite - -class SchemaRunUnitTests extends AnyFunSuite { - private val instance = SchemaRun( - app_id_snow = "N/A", - data_definition_id = "Foo", - environment = "TEST", - guid = "DebugId", - job_ref = "UnitTestJob", - message = "FooBar", - source_app = "ThisTest", - status = SchemaRun.Status.Killed(), - timestamp_end = 67890, - timestamp_start = 12345 - ) - - private val json = - """{ - | "app_id_snow" : "N/A", - | "data_definition_id" : "Foo", - | "environment" : "TEST", - | "guid" : "DebugId", - | "job_ref" : "UnitTestJob", - | "message" : "FooBar", - | "source_app" : "ThisTest", - | "status" : "Killed", - | "timestamp_end" : 67890, - | "timestamp_start" : 12345 - |}""".stripMargin - - test("Serializes to JSON properly") { - assertResult(json)(instance.asJson.toString()) - } - - test("Deserializes from JSON properly") { - assertResult(instance)(decode[SchemaRun](json).getOrElse(throw new Exception("Failed to parse JSON"))) - } -}