From d5571c50ccd7fdf0bcc69333daf860949e20dc67 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Wed, 16 Oct 2024 10:54:38 +0200 Subject: [PATCH 1/3] #24 Add once off Reader and Writer function --- .../absa/kafkacase/examples/KafkaCase.scala | 2 ++ .../kafkacase/examples/reader/ReadOnce.scala | 27 +++++++++++++++++++ .../kafkacase/examples/writer/WriteOnce.scala | 27 +++++++++++++++++++ .../za/co/absa/kafkacase/reader/Reader.scala | 16 +++++++++++ .../za/co/absa/kafkacase/writer/Writer.scala | 15 +++++++++++ 5 files changed, 87 insertions(+) create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala create mode 100644 examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index d8d6236..e54d79f 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -62,8 +62,10 @@ object KafkaCase { WriterManualResourceHandling(writerProps, topicName, sampleMessageToWrite) WriterCustomResourceHandling(writerProps, topicName, sampleMessageToWrite) WriterUsingsResourceHandling(writerProps, topicName, sampleMessageToWrite) + WriterWriteOnce(writerProps, topicName, sampleMessageToWrite) ReaderManualResourceHandling[EdlaChange](readerProps, topicName) ReaderCustomResourceHandling[EdlaChange](readerProps, topicName) ReaderUsingsResourceHandling[EdlaChange](readerProps, topicName) + ReaderReadOnce[EdlaChange](readerProps, topicName) } } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala new file mode 100644 index 0000000..cbef524 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.reader + +import io.circe.Decoder +import za.co.absa.kafkacase.reader.Reader + +import java.util.Properties + +object ReadOnce { + def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = + Reader.readOnce[T](readerProps, topicName, println) +} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala new file mode 100644 index 0000000..b0572c2 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.examples.writer + +import io.circe.Encoder +import za.co.absa.kafkacase.writer.Writer + +import java.util.Properties + +object WriteOnce { + def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = + Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite) +} diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala index a93b18c..d57793d 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala @@ -16,4 +16,20 @@ package za.co.absa.kafkacase.reader +import io.circe.Decoder + +import java.util.Properties + trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable + +object Reader { + def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { + val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false) + try { + for (item <- reader) + work(item) + } finally { + reader.close() + } + } +} diff --git a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala index 85b07c1..524f575 100644 --- a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala +++ b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala @@ -16,6 +16,10 @@ package za.co.absa.kafkacase.writer +import io.circe.Encoder + +import java.util.Properties + trait Writer[TType] extends AutoCloseable { def write(key: String, value: TType): Unit def flush(): Unit @@ -25,3 +29,14 @@ trait Writer[TType] extends AutoCloseable { flush() } } + +object Writer { + def writeOnce[T: Encoder](writerProps: Properties, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = { + val writer = new WriterImpl[T](writerProps, topicName) + try { + writer.Write(messageKey, sampleMessageToWrite) + } finally { + writer.close() + } + } +} From 2b182a53fae888246b36e712b662fba8cb292bc5 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Wed, 23 Oct 2024 12:46:24 +0200 Subject: [PATCH 2/3] post merge fix --- .../main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala | 4 ++-- .../examples/reader/{ReadOnce.scala => ReaderReadOnce.scala} | 2 +- .../writer/{WriteOnce.scala => WriterWriteOnce.scala} | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename examples/src/main/scala/za/co/absa/kafkacase/examples/reader/{ReadOnce.scala => ReaderReadOnce.scala} (97%) rename examples/src/main/scala/za/co/absa/kafkacase/examples/writer/{WriteOnce.scala => WriterWriteOnce.scala} (97%) diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala index e54d79f..853b672 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/KafkaCase.scala @@ -18,8 +18,8 @@ package za.co.absa.kafkacase.examples import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig -import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderUsingsResourceHandling} -import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling} +import za.co.absa.kafkacase.examples.reader.{ReaderCustomResourceHandling, ReaderManualResourceHandling, ReaderReadOnce, ReaderUsingsResourceHandling} +import za.co.absa.kafkacase.examples.writer.{WriterCustomResourceHandling, WriterManualResourceHandling, WriterUsingsResourceHandling, WriterWriteOnce} import za.co.absa.kafkacase.models.topics.EdlaChange import java.util.{Properties, UUID} diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala similarity index 97% rename from examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala rename to examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala index cbef524..5d0e0e4 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReadOnce.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderReadOnce.scala @@ -21,7 +21,7 @@ import za.co.absa.kafkacase.reader.Reader import java.util.Properties -object ReadOnce { +object ReaderReadOnce { def apply[T: Decoder](readerProps: Properties, topicName: String): Unit = Reader.readOnce[T](readerProps, topicName, println) } diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala similarity index 97% rename from examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala rename to examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala index b0572c2..985fc14 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriteOnce.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/writer/WriterWriteOnce.scala @@ -21,7 +21,7 @@ import za.co.absa.kafkacase.writer.Writer import java.util.Properties -object WriteOnce { +object WriterWriteOnce { def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit = Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite) } From 580f2524ca00cd9ffe2d5d247674025eb610c45e Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Wed, 23 Oct 2024 12:49:56 +0200 Subject: [PATCH 3/3] post merge fix2 --- writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala index 524f575..e63248c 100644 --- a/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala +++ b/writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala @@ -34,7 +34,7 @@ object Writer { def writeOnce[T: Encoder](writerProps: Properties, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = { val writer = new WriterImpl[T](writerProps, topicName) try { - writer.Write(messageKey, sampleMessageToWrite) + writer.write(messageKey, sampleMessageToWrite) } finally { writer.close() }