You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi guys, i'm trying to sink my data from my kafka (redpanda) topics to hudi table using storage in file format in a first time. i can't understand what is wrong with my sink conf. when i try to sink i have a message saying i can't deserialize message.
here is an example of error message :
2024-11-23 21:45:51,310 WARN || Error received while writing records for transaction 20241123214451614 in partition 1 [org.apache.hudi.connect.transaction.ConnectTransactionParticipant]
java.lang.ClassCastException: class org.apache.kafka.connect.data.Struct cannot be cast to class org.apache.avro.generic.GenericRecord (org.apache.kafka.connect.data.Struct and org.apache.avro.generic.GenericRecord are in unnamed module of loader 'app')
at org.apache.hudi.connect.writers.AbstractConnectWriter.writeRecord(AbstractConnectWriter.java:73)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.writeRecords(ConnectTransactionParticipant.java:219)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.processRecords(ConnectTransactionParticipant.java:137)
at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
here is my kafka sink connector conf :
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "redpanda-0:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "6",
im using debezium to ingest and data are stored in avro format in my topics.
Thanks for help and tips, im a newbie to hudi so i'm not aware of everything
Environment Description
Hudi version : 1.0.0
Spark version : N/A
Hive version : N/A
Hadoop version : 2.10.1
Running on Docker? (yes/no) : yes
The text was updated successfully, but these errors were encountered:
langelloantoine
changed the title
[SUPPORT] Can't read data fron kafka in avro and write them to hudi table
[SUPPORT] Can't read data from kafka in avro format and write them to hudi table
Nov 24, 2024
Hi guys, i'm trying to sink my data from my kafka (redpanda) topics to hudi table using storage in file format in a first time. i can't understand what is wrong with my sink conf. when i try to sink i have a message saying i can't deserialize message.
here is an example of error message :
2024-11-23 21:45:51,310 WARN || Error received while writing records for transaction 20241123214451614 in partition 1 [org.apache.hudi.connect.transaction.ConnectTransactionParticipant]
java.lang.ClassCastException: class org.apache.kafka.connect.data.Struct cannot be cast to class org.apache.avro.generic.GenericRecord (org.apache.kafka.connect.data.Struct and org.apache.avro.generic.GenericRecord are in unnamed module of loader 'app')
at org.apache.hudi.connect.writers.AbstractConnectWriter.writeRecord(AbstractConnectWriter.java:73)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.writeRecords(ConnectTransactionParticipant.java:219)
at org.apache.hudi.connect.transaction.ConnectTransactionParticipant.processRecords(ConnectTransactionParticipant.java:137)
at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:114)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
here is my kafka sink connector conf :
{
"name": "hudi-sink",
"config": {
"bootstrap.servers": "redpanda-0:9092",
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
"tasks.max": "6",
}
here is my mysql source kafka connect config
{
"name": "mysql-source-connector-employees",
"config": {
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda-0:8090",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://redpanda-0:8090",
"value.converter.schemas.enable": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "before,after,op",
"transforms.unwrap.drop.tombstone": "false",
"transforms.unwrap.operation.header": "true",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"group.id": "connect-cluster",
"database.hostname": "host.docker.internal",
"incremental.snapshot.chunk.size": "100000",
"max.batch.size": "50000",
"max.queue.size": "1000000",
"connectionTimeZone": "Europe/Paris",
"database.port": "3306",
"database.user": "dbz",
"database.password": "****",
"database.server.id": "5",
"database.include.list": "employees",
"database.history.kafka.bootstrap.servers": "redpanda-0:9092",
"database.history.kafka.topic": "schema-changes.employees",
"include.schema.changes": "true",
"topic.prefix": "dev",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers": "redpanda-0:9092",
"schema.history.internal.kafka.topic": "schema-internal-changes.employees",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "12",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"producer.override.compression.type": "lz4",
"offset.flush.timeout.ms": "10000",
"offset.flush.interval.ms": "5000",
"producer.override.linger.ms": "500",
"producer.override.batch.size": "2000",
"producer.override.acks": "1",
"poll.interval.ms": "50"
}
}
im using debezium to ingest and data are stored in avro format in my topics.
Thanks for help and tips, im a newbie to hudi so i'm not aware of everything
Environment Description
Hudi version : 1.0.0
Spark version : N/A
Hive version : N/A
Hadoop version : 2.10.1
Running on Docker? (yes/no) : yes
The text was updated successfully, but these errors were encountered: