diff --git a/build.sbt b/build.sbt index 92152b1..59bb030 100644 --- a/build.sbt +++ b/build.sbt @@ -19,6 +19,7 @@ val flinkVersion = "1.8.0" libraryDependencies += "org.apache.avro" % "avro" % "1.8.2" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided +libraryDependencies += "org.apache.flink" % "flink-avro" % flinkVersion // make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath, @@ -32,3 +33,7 @@ Global / cancelable := true // exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false) + +sourceGenerators in Compile += (avroScalaGenerateSpecific in Compile).taskValue + +watchSources ++= ((avroSourceDirectories in Compile).value ** "*.avsc").get diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..dcf9fec --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC15") diff --git a/src/main/avro/product-description.avsc b/src/main/avro/product-description.avsc new file mode 100644 index 0000000..a06cae3 --- /dev/null +++ b/src/main/avro/product-description.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "namespace": "nl.mrooding.data", + "name": "ProductDescription", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "description", + "type": "string" + }, + { + "name": "updatedAt", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } + } + ] +} diff --git a/src/main/avro/product-stock.avsc b/src/main/avro/product-stock.avsc new file mode 100644 index 0000000..64f83e5 --- /dev/null +++ b/src/main/avro/product-stock.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "namespace": "nl.mrooding.data", + "name": "ProductStock", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "stock", + "type": "long" + }, + { + "name": "updatedAt", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } + } + ] +} diff --git a/src/main/avro/product.avsc b/src/main/avro/product.avsc new file mode 100644 index 0000000..bb957d6 --- /dev/null +++ b/src/main/avro/product.avsc @@ -0,0 +1,29 @@ +{ + "type": "record", + "namespace": "nl.mrooding.data", + "name": "Product", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "description", + "type": ["null", "string"], + "default": null + }, + { + "name": "stock", + "type": ["null", "long"], + "default": null + }, + { + "name": "updatedAt", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } + } + ] +} diff --git a/src/main/resources/avro/product-description.avsc b/src/main/resources/avro/product-description.avsc new file mode 100644 index 0000000..d7a4f74 --- /dev/null +++ b/src/main/resources/avro/product-description.avsc @@ -0,0 +1,22 @@ +{ + "type": "record", + "name": "ProductDescription", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "description", + "type": "string" + }, + { + "name": "updatedAt", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } + } + ] +} diff --git a/src/main/resources/avro/product-stock.avsc b/src/main/resources/avro/product-stock.avsc new file mode 100644 index 0000000..bff29cb --- /dev/null +++ b/src/main/resources/avro/product-stock.avsc @@ -0,0 +1,22 @@ +{ + "type": "record", + "name": "ProductStock", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "stock", + "type": "long" + }, + { + "name": "updatedAt", + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } + } + ] +} diff --git a/src/main/resources/avro/product.avsc b/src/main/resources/avro/product.avsc index 9040bfe..7d8442d 100644 --- a/src/main/resources/avro/product.avsc +++ b/src/main/resources/avro/product.avsc @@ -18,9 +18,11 @@ }, { "name": "updatedAt", - "type": "long", - "logicalType": "timestamp-millis", - "default": 0 + "type" : { + "type" : "long", + "logicalType" : "timestamp-millis", + "default": 0 + } } ] } diff --git a/src/main/scala/nl/mrooding/ProductAggregator.scala b/src/main/scala/nl/mrooding/ProductAggregator.scala index d58567a..168187c 100644 --- a/src/main/scala/nl/mrooding/ProductAggregator.scala +++ b/src/main/scala/nl/mrooding/ProductAggregator.scala @@ -1,9 +1,9 @@ package nl.mrooding import nl.mrooding.data.{ProductDescription, ProductStock} -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.api.scala._ import nl.mrooding.source.{ProductDescriptionSource, ProductStockSource} +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object ProductAggregator { private[this] val intervalMs = 1000 diff --git a/src/main/scala/nl/mrooding/ProductProcessor.scala b/src/main/scala/nl/mrooding/ProductProcessor.scala index 6c30812..981af59 100644 --- a/src/main/scala/nl/mrooding/ProductProcessor.scala +++ b/src/main/scala/nl/mrooding/ProductProcessor.scala @@ -7,7 +7,7 @@ import org.apache.flink.util.Collector case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product] { private[this] lazy val stateDescriptor: ValueStateDescriptor[Product] = - new ValueStateDescriptor[Product]("product-join", Product.serializer) + new ValueStateDescriptor[Product]("product-join", classOf[Product]) private[this] lazy val state: ValueState[Product] = getRuntimeContext.getState(stateDescriptor) override def processElement1(value: ProductDescription, ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]): Unit = { diff --git a/src/main/scala/nl/mrooding/data/ProductDescriptionOld.scala b/src/main/scala/nl/mrooding/data/ProductDescriptionOld.scala new file mode 100644 index 0000000..230ddd7 --- /dev/null +++ b/src/main/scala/nl/mrooding/data/ProductDescriptionOld.scala @@ -0,0 +1,7 @@ +package nl.mrooding.data + +import java.time.Instant + +case class ProductDescriptionOld(id: String, + description: String, + updatedAt: Instant) diff --git a/src/main/scala/nl/mrooding/data/ProductOld.scala b/src/main/scala/nl/mrooding/data/ProductOld.scala new file mode 100644 index 0000000..ce050a2 --- /dev/null +++ b/src/main/scala/nl/mrooding/data/ProductOld.scala @@ -0,0 +1,40 @@ +package nl.mrooding.data + +import java.time.Instant + +import nl.mrooding.state.ProductSerializer +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.flink.api.common.typeutils.TypeSerializer + +case class ProductOld( + id: String, + description: Option[String], + stock: Option[Long], + updatedAt: Instant + ) extends AvroGenericRecordWriter { + + def toGenericRecord: GenericRecord = { + val genericRecord = new GenericData.Record(ProductOld.getCurrentSchema) + genericRecord.put("id", id) + genericRecord.put("description", description.orNull) + genericRecord.put("stock", stock.getOrElse(0l)) + genericRecord.put("updatedAt", updatedAt.toEpochMilli) + + genericRecord + } +} + +object ProductOld extends AvroSchema with AvroSerializable[ProductOld] { + val schemaPath: String = "/avro/product.avsc" + + val serializer: TypeSerializer[ProductOld] = new ProductSerializer(None) + + def apply(record: GenericRecord): ProductOld = { + ProductOld( + id = record.get("id").toString, + description = Option(record.get("description")).map(_.toString), + stock = Option(record.get("stock")).map(_.asInstanceOf[Long]), + updatedAt = Instant.ofEpochMilli(record.get("updatedAt").asInstanceOf[Long]) + ) + } +} diff --git a/src/main/scala/nl/mrooding/data/ProductStockOld.scala b/src/main/scala/nl/mrooding/data/ProductStockOld.scala new file mode 100644 index 0000000..44ba40f --- /dev/null +++ b/src/main/scala/nl/mrooding/data/ProductStockOld.scala @@ -0,0 +1,7 @@ +package nl.mrooding.data + +import java.time.Instant + +case class ProductStockOld(id: String, + stock: Long, + updatedAt: Instant) diff --git a/src/main/scala/nl/mrooding/state/ProductSerializer.scala b/src/main/scala/nl/mrooding/state/ProductSerializer.scala index e83dfe8..f282b5e 100644 --- a/src/main/scala/nl/mrooding/state/ProductSerializer.scala +++ b/src/main/scala/nl/mrooding/state/ProductSerializer.scala @@ -1,21 +1,21 @@ package nl.mrooding.state -import nl.mrooding.data.Product +import nl.mrooding.data.ProductOld import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.util.InstantiationUtil -class ProductSerializer(val stateSchema: Option[Schema]) extends CustomAvroSerializer[Product] { +class ProductSerializer(val stateSchema: Option[Schema]) extends CustomAvroSerializer[ProductOld] { - override def getCurrentSchema: Schema = Product.getCurrentSchema + override def getCurrentSchema: Schema = ProductOld.getCurrentSchema - override def fromGenericRecord(genericRecord: GenericRecord): Product = Product.apply(genericRecord) + override def fromGenericRecord(genericRecord: GenericRecord): ProductOld = ProductOld.apply(genericRecord) - override def duplicate(): TypeSerializer[Product] = + override def duplicate(): TypeSerializer[ProductOld] = new ProductSerializer(stateSchema) - override def createInstance(): Product = InstantiationUtil.instantiate(classOf[Product]) + override def createInstance(): ProductOld = InstantiationUtil.instantiate(classOf[ProductOld]) - override def snapshotConfiguration(): TypeSerializerSnapshot[Product] = new ProductSerializerSnapshot() + override def snapshotConfiguration(): TypeSerializerSnapshot[ProductOld] = new ProductSerializerSnapshot() } diff --git a/src/main/scala/nl/mrooding/state/ProductSerializerSnapshot.scala b/src/main/scala/nl/mrooding/state/ProductSerializerSnapshot.scala index 785934e..7d62d03 100644 --- a/src/main/scala/nl/mrooding/state/ProductSerializerSnapshot.scala +++ b/src/main/scala/nl/mrooding/state/ProductSerializerSnapshot.scala @@ -1,15 +1,15 @@ package nl.mrooding.state -import nl.mrooding.data.Product +import nl.mrooding.data.ProductOld import org.apache.avro.Schema import org.apache.flink.api.common.typeutils.TypeSerializer -class ProductSerializerSnapshot(var stateSchema: Option[Schema]) extends CustomAvroSerializerSnapshot[Product] { +class ProductSerializerSnapshot(var stateSchema: Option[Schema]) extends CustomAvroSerializerSnapshot[ProductOld] { def this() = { - this(None) + this(null) } - override def getCurrentSchema: Schema = Product.getCurrentSchema + override def getCurrentSchema: Schema = ProductOld.getCurrentSchema - override def restoreSerializer(): TypeSerializer[Product] = new ProductSerializer(stateSchema) + override def restoreSerializer(): TypeSerializer[ProductOld] = new ProductSerializer(stateSchema) }