Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Can't read data from kafka in avro format and write them to hudi table #12324

Open
langelloantoine opened this issue Nov 24, 2024 · 1 comment

Comments

@langelloantoine
Copy link

Hi guys, i'm trying to sink my data from my kafka (redpanda) topics to hudi table using storage in file format in a first time. i can't understand what is wrong with my sink conf. when i try to sink i have a message saying i can't deserialize message.
here is an example of error message :
2024-11-23 21:45:51,310 WARN || Error received while writing records for transaction 20241123214451614 in partition 1 [org.apache.hudi.connect.transaction.ConnectTransactionParticipant]
java.lang.ClassCastException: class org.apache.kafka.connect.data.Struct cannot be cast to class org.apache.avro.generic.GenericRecord (org.apache.kafka.connect.data.Struct and org.apache.avro.generic.GenericRecord are in unnamed module of loader 'app')
at org.apache.hudi.connect.writers.AbstractConnectWriter.writeRecord(AbstractConnectWriter.java:73)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.writeRecords(ConnectTransactionParticipant.java:219)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.processRecords(ConnectTransactionParticipant.java:137)
at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
here is my kafka sink connector conf :
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "redpanda-0:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "6",

    "key.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://redpanda-0:8090",
    "value.converter.schema.registry.url": "http://redpanda-0:8090",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",

    "topics": "dev.employees.employees",
    
    "hoodie.write.payload.class": "org.apache.hudi.common.model.HoodieAvroPayload",
    "hoodie.write.payload.serializer.class": "org.apache.hudi.common.model.HoodieAvroPayloadSerializer",
    "hoodie.table.name": "dev_employees_employees",
    "hoodie.table.type": "MERGE_ON_READ",
    "hoodie.base.path": "file:///tmp/hoodie/dev_employees_employees",
    
    "hoodie.datasource.write.recordkey.field": "payload.after.emp_no",
    "hoodie.datasource.write.partitionpath.field": "payload.after.birth_date",

    "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
    "hoodie.streamer.schemaprovider.registry.url": "http://redpanda-0:8090/subjects/dev.employees.employees-value/versions/latest"
}

}

here is my mysql source kafka connect config

{
"name": "mysql-source-connector-employees",
"config": {
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda-0:8090",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://redpanda-0:8090",
"value.converter.schemas.enable": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "before,after,op",
"transforms.unwrap.drop.tombstone": "false",
"transforms.unwrap.operation.header": "true",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"group.id": "connect-cluster",
"database.hostname": "host.docker.internal",
"incremental.snapshot.chunk.size": "100000",
"max.batch.size": "50000",
"max.queue.size": "1000000",
"connectionTimeZone": "Europe/Paris",
"database.port": "3306",
"database.user": "dbz",
"database.password": "****",
"database.server.id": "5",
"database.include.list": "employees",
"database.history.kafka.bootstrap.servers": "redpanda-0:9092",
"database.history.kafka.topic": "schema-changes.employees",
"include.schema.changes": "true",
"topic.prefix": "dev",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers": "redpanda-0:9092",
"schema.history.internal.kafka.topic": "schema-internal-changes.employees",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "12",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"producer.override.compression.type": "lz4",
"offset.flush.timeout.ms": "10000",
"offset.flush.interval.ms": "5000",
"producer.override.linger.ms": "500",
"producer.override.batch.size": "2000",
"producer.override.acks": "1",
"poll.interval.ms": "50"

}
}

im using debezium to ingest and data are stored in avro format in my topics.

Thanks for help and tips, im a newbie to hudi so i'm not aware of everything

Environment Description

  • Hudi version : 1.0.0

  • Spark version : N/A

  • Hive version : N/A

  • Hadoop version : 2.10.1

  • Running on Docker? (yes/no) : yes

@langelloantoine langelloantoine changed the title [SUPPORT] Can't read data fron kafka in avro and write them to hudi table [SUPPORT] Can't read data from kafka in avro format and write them to hudi table Nov 24, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Nov 27, 2024
@rangareddy
Copy link

Hi @langelloantoine

Could you please try adding the value.deserializer.specific.avro.reader=true parameter to the Hudi sink connector?

@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants