Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot deserialize Array[Byte] fields via Avro with Schema Registry #1330

Open
onepintwig opened this issue Jun 10, 2024 · 1 comment
Open

Comments

@onepintwig
Copy link

onepintwig commented Jun 10, 2024

Hello

Firstly big fan of the library - has made interfacing with Kafka a breeze! Apologies if this is the wrong place to raise this and I should be looking at the vulcan repository instead.

I have recently hit an issue when trying to deserialise Array[Byte] keys and values when using the Vulcan module with a Schema Registry. When referenced in nested models this data type works fine. I'm aware this is a slightly odd interface, but unfortunately it is what we have to work with.

vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
  at vulcan.AvroException$.apply(AvroException.scala:18)
  at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
  at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
  at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
  at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
  at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
  at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
  at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
  at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
  at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)

I have reproduced in a simple test scenario running using the confluent 7.5.3 kafka stack via testcontainers in

Scala version: 2.12.18
fs2-kafka version: 3.5.1

//Fails
"SerDes" should "Serialize and deserialize ByteArray keys" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, Array[Byte], String](avroSerializer[Array[Byte]].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, Array[Byte], String](avroDeserializer[Array[Byte]].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic1"
    val keyBytes = "123".getBytes
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, keyBytes, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (keyBytes, value)

  }

  //Succeeds
  it should "Serialize and deserialize String keys and values" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, String, String](avroSerializer[String].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, String, String](avroDeserializer[String].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic3"
    val key = "123"
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, key, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (key, value)

  }
@onepintwig
Copy link
Author

onepintwig commented Jun 11, 2024

I can see that the underlying error is thrown by this block in the vulcan Codec. But what I am struggling to understand is why it is being read as an Array[Byte] and not as the ByteBuffer is should have been encoded as.

I have raised an issue there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant