You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Firstly big fan of the library - has made interfacing with Kafka a breeze! Apologies if this is the wrong place to raise this and I should be looking at the vulcan repository instead.
I have recently hit an issue when trying to deserialise Array[Byte] keys and values when using the Vulcan module with a Schema Registry. When referenced in nested models this data type works fine. I'm aware this is a slightly odd interface, but unfortunately it is what we have to work with.
vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
at vulcan.AvroException$.apply(AvroException.scala:18)
at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)
I have reproduced in a simple test scenario running using the confluent 7.5.3 kafka stack via testcontainers in
I can see that the underlying error is thrown by this block in the vulcan Codec. But what I am struggling to understand is why it is being read as an Array[Byte] and not as the ByteBuffer is should have been encoded as.
Hello
Firstly big fan of the library - has made interfacing with Kafka a breeze! Apologies if this is the wrong place to raise this and I should be looking at the vulcan repository instead.
I have recently hit an issue when trying to deserialise
Array[Byte]
keys and values when using the Vulcan module with a Schema Registry. When referenced in nested models this data type works fine. I'm aware this is a slightly odd interface, but unfortunately it is what we have to work with.I have reproduced in a simple test scenario running using the confluent
7.5.3
kafka stack via testcontainers inScala version: 2.12.18
fs2-kafka version: 3.5.1
The text was updated successfully, but these errors were encountered: