Skip to content

Commit 7b1e511

Browse files
committed
#23 Create a utils class that will help users manage their properties
1 parent 456c654 commit 7b1e511

File tree

13 files changed

+83
-58
lines changed

13 files changed

+83
-58
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
topicName = "KillMePleaseTopic"
3+
4+
writer {
5+
bootstrap.servers = "ZADALNRAPP00009.corp.dsarena.com:9092"
6+
key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
7+
value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
8+
}
9+
10+
reader {
11+
bootstrap.servers = "ZADALNRAPP00009.corp.dsarena.com:9092"
12+
key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
13+
value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
14+
client.id = "DebugConsumer_007"
15+
group.id = "DebugGroup_007"
16+
auto.offset.reset = "earliest"
17+
}
18+
}

examples/src/main/scala-2/za.co.absa.kafkacase.examples/reader/UsingsResourceHandling.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package za.co.absa.kafkacase.examples.reader
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Decoder
20-
import java.util.Properties
2121

2222
object UsingsResourceHandling {
23-
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
23+
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
2424
println("Scala 3 feature")
2525
}
2626
}

examples/src/main/scala-2/za.co.absa.kafkacase.examples/writer/UsingsResourceHandling.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package za.co.absa.kafkacase.examples.writer
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Encoder
20-
import java.util.Properties
2121

2222
object UsingsResourceHandling {
23-
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
23+
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
2424
println("Scala 3 feature")
2525
}
2626
}

examples/src/main/scala-3/za/co/absa/kafkacase/examples/reader/UsingsResourceHandling.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.reader
1919
import io.circe.Decoder
2020
import za.co.absa.kafkacase.reader.ReaderImpl
2121

22-
import java.util.Properties
2322
import scala.util.Using
2423

2524
object UsingsResourceHandling {
26-
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
27-
Using(new ReaderImpl[T](readerProps, topicName, neverEnding = false)) { reader =>
25+
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
26+
Using(new ReaderImpl[T](readerConf, topicName, neverEnding = false)) { reader =>
2827
for (item <- reader)
2928
println(item)
3029
}

examples/src/main/scala-3/za/co/absa/kafkacase/examples/writer/UsingsResourceHandling.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ package za.co.absa.kafkacase.examples.writer
1919
import io.circe.Encoder
2020
import za.co.absa.kafkacase.writer.WriterImpl
2121

22-
import java.util.Properties
2322
import scala.util.Using
2423

2524
object UsingsResourceHandling {
26-
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
27-
Using(new WriterImpl[T](writerProps, topicName)) { writer =>
25+
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
26+
Using(new WriterImpl[T](writerConf, topicName)) { writer =>
2827
writer.Write("sampleMessageKey1", sampleMessageToWrite)
2928
writer.Write("sampleMessageKey2", sampleMessageToWrite)
3029
}

examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
package za.co.absa.kafkacase.examples
1818

19-
import org.apache.kafka.clients.consumer.ConsumerConfig
20-
import org.apache.kafka.clients.producer.ProducerConfig
19+
import com.typesafe.config.ConfigFactory
2120
import za.co.absa.kafkacase.models.topics.EdlaChange
2221

23-
import java.util.{Properties, UUID}
24-
2522
object KafkaCase {
23+
private val config = ConfigFactory.load()
24+
2625
// This goes from your application logic
2726
private val sampleMessageToWrite = EdlaChange(
2827
app_id_snow = "N/A",
@@ -37,31 +36,12 @@ object KafkaCase {
3736
timestamp_event = 12345
3837
)
3938

40-
// This goes from your config / domain knowledge
41-
private val topicName = "KillMePleaseTopic"
42-
43-
// This goes from your writer configs
44-
private val writerProps = new Properties()
45-
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
46-
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
47-
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
48-
49-
// This goes from your reader configs
50-
private val readerProps = new Properties()
51-
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
52-
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
53-
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
54-
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
55-
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
56-
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
57-
58-
5939
def main(args: Array[String]): Unit = {
60-
writer.ManualResourceHandling(writerProps, topicName, sampleMessageToWrite)
61-
writer.CustomResourceHandling(writerProps, topicName, sampleMessageToWrite)
62-
writer.UsingsResourceHandling(writerProps, topicName, sampleMessageToWrite)
63-
reader.ManualResourceHandling[EdlaChange](readerProps, topicName)
64-
reader.CustomResourceHandling[EdlaChange](readerProps, topicName)
65-
reader.UsingsResourceHandling[EdlaChange](readerProps, topicName)
40+
writer.ManualResourceHandling(config.getConfig("writer"), config.getString("topicName"), sampleMessageToWrite)
41+
writer.CustomResourceHandling(config.getConfig("writer"), config.getString("topicName"), sampleMessageToWrite)
42+
writer.UsingsResourceHandling(config.getConfig("writer"), config.getString("topicName"), sampleMessageToWrite)
43+
reader.ManualResourceHandling[EdlaChange](config.getConfig("reader"), config.getString("topicName"))
44+
reader.CustomResourceHandling[EdlaChange](config.getConfig("reader"), config.getString("topicName"))
45+
reader.UsingsResourceHandling[EdlaChange](config.getConfig("reader"), config.getString("topicName"))
6646
}
6747
}

examples/src/main/scala/za/co/absa/kafkacase/examples/reader/CustomResourceHandling.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616

1717
package za.co.absa.kafkacase.examples.reader
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Decoder
2021
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
2122
import za.co.absa.kafkacase.reader.ReaderImpl
2223

23-
import java.util.Properties
24-
2524
object CustomResourceHandling {
26-
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
27-
withResource(new ReaderImpl[T](readerProps, topicName, neverEnding = false))(reader => {
25+
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
26+
withResource(new ReaderImpl[T](readerConf, topicName, neverEnding = false))(reader => {
2827
for (item <- reader)
2928
println(item)
3029
})

examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ManualResourceHandling.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package za.co.absa.kafkacase.examples.reader
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Decoder
2021
import za.co.absa.kafkacase.reader.ReaderImpl
2122

22-
import java.util.Properties
23-
2423
object ManualResourceHandling {
25-
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = {
26-
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
24+
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
25+
val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false)
2726
try {
2827
for (item <- reader)
2928
println(item)

examples/src/main/scala/za/co/absa/kafkacase/examples/writer/CustomResourceHandling.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,16 @@
1616

1717
package za.co.absa.kafkacase.examples.writer
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Encoder
2021
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
2122
import za.co.absa.kafkacase.writer.WriterImpl
2223

2324
import java.util.Properties
2425

2526
object CustomResourceHandling {
26-
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
27-
withResource(new WriterImpl[T](writerProps, topicName))(writer => {
27+
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
28+
withResource(new WriterImpl[T](writerConf, topicName))(writer => {
2829
writer.Write("sampleMessageKey1", sampleMessageToWrite)
2930
writer.Write("sampleMessageKey2", sampleMessageToWrite)
3031
})

examples/src/main/scala/za/co/absa/kafkacase/examples/writer/ManualResourceHandling.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package za.co.absa.kafkacase.examples.writer
1818

19+
import com.typesafe.config.Config
1920
import io.circe.Encoder
2021
import za.co.absa.kafkacase.writer.WriterImpl
2122

22-
import java.util.Properties
23-
2423
object ManualResourceHandling {
25-
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = {
26-
val writer = new WriterImpl[T](writerProps, topicName)
24+
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit = {
25+
val writer = new WriterImpl[T](writerConf, topicName)
2726
try {
2827
writer.Write("sampleMessageKey1", sampleMessageToWrite)
2928
writer.Write("sampleMessageKey2", sampleMessageToWrite)

0 commit comments

Comments
 (0)