Skip to content

Commit f596158

Browse files
authored
Feature/#15 sync schema with event gate (#16)
1 parent dc9c5f1 commit f596158

File tree

7 files changed

+58
-57
lines changed

7 files changed

+58
-57
lines changed

examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ object KafkaCase {
2828
private def writer_use_case(): Unit = {
2929
// 0 -> HAVE SOMETHING TO WRITE
3030
val messageToWrite = EdlaChangeTopic(
31-
id = "DebugId",
3231
app_id_snow = "N/A",
33-
source_app = "ThisCode",
32+
data_definition_id = "TestingThis",
3433
environment = "DEV",
35-
timestamp_event = 12345,
36-
data_definition = "TestingThis",
37-
operation = EdlaChangeTopic.Operation.CREATE(),
38-
location = "ether",
3934
format = "FooBar",
40-
schema_link = "http://not.here"
35+
guid = "DebugId",
36+
location = "ether",
37+
operation = EdlaChangeTopic.Operation.CREATE(),
38+
schema_link = "http://not.here",
39+
source_app = "ThisCode",
40+
timestamp_event = 12345
4141
)
4242

4343
// 1 -> DEFINE PROPS - kafka to treat all as string

models/src/main/scala/za/co/absa/KafkaCase/Models/EdlaChangeTopic.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import io.circe.generic.JsonCodec
2121

2222
@JsonCodec
2323
case class EdlaChangeTopic(
24-
id: String,
2524
app_id_snow: String,
26-
source_app: String,
25+
data_definition_id: String,
2726
environment: String,
28-
timestamp_event: Long,
29-
data_definition: String,
30-
operation: EdlaChangeTopic.Operation,
31-
location: String,
3227
format: String,
33-
schema_link: String
28+
guid: String,
29+
location: String,
30+
operation: EdlaChangeTopic.Operation,
31+
schema_link: String,
32+
source_app: String,
33+
timestamp_event: Long
3434
)
3535

3636
object EdlaChangeTopic {

models/src/main/scala/za/co/absa/KafkaCase/Models/SchemaRunTopic.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import io.circe.generic.JsonCodec
2121

2222
@JsonCodec
2323
case class SchemaRunTopic(
24-
id: String,
25-
job_ref: String,
2624
app_id_snow: String,
27-
source_app: String,
28-
environment: String,
29-
timestamp_start: Long,
30-
timestamp_end: Long,
3125
data_definition_id: String,
26+
environment: String,
27+
guid: String,
28+
job_ref: String,
29+
message: String,
30+
source_app: String,
3231
status: SchemaRunTopic.Status,
33-
message: String
32+
timestamp_end: Long,
33+
timestamp_start: Long
3434
)
3535

3636
object SchemaRunTopic {

models/src/test/scala/za/co/absa/KafkaCase/Models/EdlaChangeTopicUnitTests.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,38 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.KafkaCase.models
17+
package za.co.absa.KafkaCase.Models
1818

1919
import io.circe.jawn.decode
2020
import io.circe.syntax.EncoderOps
2121
import org.scalatest.funsuite.AnyFunSuite
22-
import za.co.absa.KafkaCase.Models.EdlaChangeTopic
2322

2423
class EdlaChangeTopicUnitTests extends AnyFunSuite {
2524
private val instance = EdlaChangeTopic(
26-
id = "DebugId",
2725
app_id_snow = "N/A",
28-
source_app = "ThisCode",
26+
data_definition_id = "TestingThis",
2927
environment = "DEV",
30-
timestamp_event = 12345,
31-
data_definition = "TestingThis",
32-
operation = EdlaChangeTopic.Operation.CREATE(),
33-
location = "ether",
3428
format = "FooBar",
35-
schema_link = "http://not.here"
29+
guid = "DebugId",
30+
location = "ether",
31+
operation = EdlaChangeTopic.Operation.CREATE(),
32+
schema_link = "http://not.here",
33+
source_app = "ThisCode",
34+
timestamp_event = 12345
3635
)
3736

3837
private val json =
3938
"""{
40-
| "id" : "DebugId",
4139
| "app_id_snow" : "N/A",
42-
| "source_app" : "ThisCode",
40+
| "data_definition_id" : "TestingThis",
4341
| "environment" : "DEV",
44-
| "timestamp_event" : 12345,
45-
| "data_definition" : "TestingThis",
46-
| "operation" : "CREATE",
47-
| "location" : "ether",
4842
| "format" : "FooBar",
49-
| "schema_link" : "http://not.here"
43+
| "guid" : "DebugId",
44+
| "location" : "ether",
45+
| "operation" : "CREATE",
46+
| "schema_link" : "http://not.here",
47+
| "source_app" : "ThisCode",
48+
| "timestamp_event" : 12345
5049
|}""".stripMargin
5150

5251
test("Serializes to JSON properly") {

models/src/test/scala/za/co/absa/KafkaCase/Models/SchemaRunTopicUnitTests.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,38 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.KafkaCase.models
17+
package za.co.absa.KafkaCase.Models
1818

1919
import io.circe.jawn.decode
2020
import io.circe.syntax.EncoderOps
2121
import org.scalatest.funsuite.AnyFunSuite
22-
import za.co.absa.KafkaCase.Models.SchemaRunTopic
2322

2423
class SchemaRunTopicUnitTests extends AnyFunSuite {
2524
private val instance = SchemaRunTopic(
26-
id = "DebugId",
27-
job_ref = "UnitTestJob",
2825
app_id_snow = "N/A",
29-
source_app = "ThisTest",
30-
environment = "TEST",
31-
timestamp_start = 12345,
32-
timestamp_end = 67890,
3326
data_definition_id = "Foo",
27+
environment = "TEST",
28+
guid = "DebugId",
29+
job_ref = "UnitTestJob",
30+
message = "FooBar",
31+
source_app = "ThisTest",
3432
status = SchemaRunTopic.Status.Killed(),
35-
message = "FooBar"
33+
timestamp_end = 67890,
34+
timestamp_start = 12345
3635
)
3736

3837
private val json =
3938
"""{
40-
| "id" : "DebugId",
41-
| "job_ref" : "UnitTestJob",
4239
| "app_id_snow" : "N/A",
43-
| "source_app" : "ThisTest",
44-
| "environment" : "TEST",
45-
| "timestamp_start" : 12345,
46-
| "timestamp_end" : 67890,
4740
| "data_definition_id" : "Foo",
41+
| "environment" : "TEST",
42+
| "guid" : "DebugId",
43+
| "job_ref" : "UnitTestJob",
44+
| "message" : "FooBar",
45+
| "source_app" : "ThisTest",
4846
| "status" : "Killed",
49-
| "message" : "FooBar"
47+
| "timestamp_end" : 67890,
48+
| "timestamp_start" : 12345
5049
|}""".stripMargin
5150

5251
test("Serializes to JSON properly") {

reader/src/main/scala/za/co/absa/KafkaCase/Reader/Reader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616

1717
package za.co.absa.KafkaCase.Reader
1818

19-
trait Reader[TType] extends Iterator[(String, TType)] with AutoCloseable
19+
trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable

reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura
3333

3434
override def hasNext: Boolean = singlePollIterator.hasNext
3535

36-
override def next(): (String, TType) = {
36+
override def next(): (String, Either[String, TType]) = {
3737
log.info("Fetching next item")
3838
val nextItem = singlePollIterator.next()
39-
val nextItemTyped = decode[TType](nextItem.value()).getOrElse(throw new Exception(s"Cannot parse $nextItem"))
4039
if (!singlePollIterator.hasNext)
4140
singlePollIterator = fetchNextBatch()
42-
nextItem.key() -> nextItemTyped
41+
val nextItemMaybeTyped = decode[TType](nextItem.value()) match {
42+
case Left(_) => Left(s"Cannot parse ${nextItem.value()}")
43+
case Right(item) => Right(item)
44+
}
45+
nextItem.key() -> nextItemMaybeTyped
4346
}
4447

4548
def close(): Unit = consumer.close()

0 commit comments

Comments
 (0)