Skip to content

Commit 7b11e9d

Browse files
committed
#17 Resource handling helper classes
1 parent 029ca61 commit 7b11e9d

File tree

2 files changed

+143
-42
lines changed

2 files changed

+143
-42
lines changed

examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala

Lines changed: 81 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,72 +14,111 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.kafkacase.examples
17+
package za.co.absa.KafkaCase.Examples
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig
2020
import org.apache.kafka.clients.producer.ProducerConfig
2121
import za.co.absa.kafkacase.models.topics.EdlaChange
22+
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
2223
import za.co.absa.kafkacase.reader.ReaderImpl
2324
import za.co.absa.kafkacase.writer.WriterImpl
2425

2526
import java.util.{Properties, UUID}
27+
// scala3 only
28+
// import scala.util.Using
2629

2730
object KafkaCase {
28-
private def writer_use_case(): Unit = {
29-
// 0 -> HAVE SOMETHING TO WRITE
30-
val messageToWrite = EdlaChange(
31-
app_id_snow = "N/A",
32-
data_definition_id = "TestingThis",
33-
environment = "DEV",
34-
format = "FooBar",
35-
guid = "DebugId",
36-
location = "ether",
37-
operation = EdlaChange.Operation.Create(),
38-
schema_link = "http://not.here",
39-
source_app = "ThisCode",
40-
timestamp_event = 12345
41-
)
42-
43-
// 1 -> DEFINE PROPS - kafka to treat all as string
44-
val writerProps = new Properties()
45-
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
46-
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
47-
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
48-
49-
// 2 -> MAKE WRITER
50-
val writer = new WriterImpl[EdlaChange](writerProps, "KillMePleaseTopic")
31+
// This goes from your application logic
32+
private val sampleMessageToWrite = EdlaChange(
33+
app_id_snow = "N/A",
34+
data_definition_id = "TestingThis",
35+
environment = "DEV",
36+
format = "FooBar",
37+
guid = "DebugId",
38+
location = "ether",
39+
operation = EdlaChange.Operation.Create(),
40+
schema_link = "http://not.here",
41+
source_app = "ThisCode",
42+
timestamp_event = 12345
43+
)
44+
45+
// This goes from your config / domain knowledge
46+
private val topicName = "KillMePleaseTopic"
47+
48+
// This goes from your writer configs
49+
private val writerProps = new Properties()
50+
writerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
51+
writerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
52+
writerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
53+
54+
// This goes from your reader configs
55+
private val readerProps = new Properties()
56+
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
57+
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
58+
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
59+
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
60+
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
61+
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
62+
63+
private def writer_use_case_scala2(): Unit = {
64+
val writer = new WriterImpl[EdlaChange](writerProps, topicName)
5165
try {
52-
// 3 -> WRITE
53-
writer.Write("sampleMessageKey", messageToWrite)
66+
writer.Write("sampleMessageKey1", sampleMessageToWrite)
67+
writer.Write("sampleMessageKey2", sampleMessageToWrite)
5468
} finally {
55-
// Releasing resources should be handled by using block in newer versions of scala
5669
writer.close()
5770
}
5871
}
5972

60-
private def reader_use_case(): Unit = {
61-
// 1 -> DEFINE PROPS - kafka to treat all as string
62-
val readerProps = new Properties()
63-
readerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ZADALNRAPP00009.corp.dsarena.com:9092")
64-
readerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
65-
readerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
66-
readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, s"DebugConsumer_${UUID.randomUUID()}")
67-
readerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"DebugGroup_${UUID.randomUUID()}")
68-
readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
69-
70-
// 2 -> MAKE READER (should be in using block for newer versions of scala)
71-
val reader = new ReaderImpl[EdlaChange](readerProps, "KillMePleaseTopic")
73+
private def writer_use_case_scala2_custom_resource_handler(): Unit = {
74+
withResource(new WriterImpl[EdlaChange](writerProps, topicName))(writer => {
75+
writer.Write("sampleMessageKey1", sampleMessageToWrite)
76+
writer.Write("sampleMessageKey2", sampleMessageToWrite)
77+
})
78+
}
79+
80+
// scala3 only
81+
// private def writer_use_case_scala3(): Unit = {
82+
// Using(new WriterImpl[EdlaChange](writerProps, topicName)) { writer =>
83+
// writer.Write("sampleMessageKey1", sampleMessageToWrite)
84+
// writer.Write("sampleMessageKey2", sampleMessageToWrite)
85+
// }
86+
// }
87+
88+
private def reader_use_case_scala2(): Unit = {
89+
val reader = new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false)
7290
try {
7391
for (item <- reader)
7492
println(item)
7593
} finally {
76-
// Releasing resources should be handled by using block in newer versions of scala
7794
reader.close()
7895
}
7996
}
8097

98+
private def reader_use_case_scala2_custom_resource_handler(): Unit = {
99+
withResource(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false))(reader => {
100+
for (item <- reader)
101+
println(item)
102+
})
103+
}
104+
105+
// scala3 only
106+
// private def reader_use_case_scala3(): Unit = {
107+
// Using(new ReaderImpl[EdlaChange](readerProps, topicName, neverEnding = false)) { reader =>
108+
// for (item <- reader)
109+
// println(item)
110+
// }
111+
// }
112+
81113
def main(args: Array[String]): Unit = {
82-
writer_use_case()
83-
reader_use_case()
114+
writer_use_case_scala2()
115+
reader_use_case_scala2()
116+
117+
writer_use_case_scala2_custom_resource_handler()
118+
reader_use_case_scala2_custom_resource_handler()
119+
120+
// scala3 only
121+
// writer_use_case_scala3()
122+
// reader_use_case_scala3()
84123
}
85124
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2024 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.kafkacase.models.utils
18+
19+
import scala.util.control.NonFatal
20+
21+
// Inspired by https://dkomanov.medium.com/scala-try-with-resources-735baad0fd7d
22+
// and for-comprehension contract by ATUM's ARMImplits
23+
object ResourceHandler {
24+
def withResource[TResource <: AutoCloseable, TResult](resourceFactory: => TResource)(operation: TResource => TResult): TResult = {
25+
val resource: TResource = resourceFactory
26+
require(resource != null, "resource is null")
27+
var exception: Throwable = null
28+
try {
29+
operation(resource)
30+
} catch {
31+
case NonFatal(ex) =>
32+
exception = ex
33+
throw ex
34+
} finally {
35+
closeAndAddSuppressed(exception, resource)
36+
}
37+
}
38+
39+
private def closeAndAddSuppressed(ex: Throwable, resource: AutoCloseable): Unit = {
40+
if (ex != null) {
41+
try {
42+
resource.close()
43+
} catch {
44+
case NonFatal(suppressed) =>
45+
ex.addSuppressed(suppressed)
46+
}
47+
} else {
48+
resource.close()
49+
}
50+
}
51+
52+
// implementing a for-comprehension contract inspired by ATUM's ARMImplits
53+
implicit class ResourceWrapper[TResource <: AutoCloseable](resourceFactory: => TResource) {
54+
def foreach(operation: TResource => Unit): Unit = withResource(resourceFactory)(operation)
55+
56+
def map[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation)
57+
58+
def flatMap[TResult](operation: TResource => TResult): TResult = withResource(resourceFactory)(operation)
59+
60+
def withFilter(ignored: TResource => Boolean): ResourceWrapper[TResource] = this
61+
}
62+
}

0 commit comments

Comments
 (0)