diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c07d537..17864fa 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -13,6 +13,7 @@ jobs: distribution: temurin java-version: 11 check-latest: true + - uses: sbt/setup-sbt@v1 - name: Set output id: vars run: echo ::set-output name=tag::${GITHUB_REF#refs/*/} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3a82c73..88fd883 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,5 +15,6 @@ jobs: distribution: temurin java-version: 11 check-latest: true + - uses: sbt/setup-sbt@v1 - name: Tests run: sbt clean test diff --git a/build.sbt b/build.sbt index 6052e4d..79568c1 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,6 @@ import org.typelevel.scalacoptions.ScalacOptions +import sbtassembly.AssemblyPlugin.autoImport.* + name := "my_custom_deserializers" version := sys.env.getOrElse("CREATED_TAG", "0.1") @@ -7,7 +9,9 @@ libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.4.0" libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf", "com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0" % "protobuf", - "com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0" + "com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.14.3" + ) Compile / tpolecatExcludeOptions ++= Set( @@ -35,3 +39,8 @@ ThisBuild / credentials += Credentials( GITHUB_OWNER, System.getenv("GITHUB_TOKEN") ) + +assembly / assemblyMergeStrategy := { + case PathList("META-INF", "versions", "9", "module-info.class") => MergeStrategy.first + case x => (assembly / assemblyMergeStrategy).value(x) +} \ No newline at end of file diff --git a/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomJsonNodeDeserializer.scala b/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomJsonNodeDeserializer.scala new file mode 100644 index 0000000..cb2e396 --- /dev/null +++ b/src/main/scala/io/example/conduktor/custom/deserializers/MyCustomJsonNodeDeserializer.scala @@ -0,0 +1,11 @@ +package io.example.conduktor.custom.deserializers + +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import org.apache.kafka.common.serialization.Deserializer + +class MyCustomJsonNodeDeserializer extends Deserializer[JsonNode] { + override def deserialize(topic: String, data: Array[Byte]): JsonNode = { + val mapper = new ObjectMapper() + mapper.readTree(data) + } +} \ No newline at end of file