Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] - Cannot Ingest Protobuf records using Hudi Streamer #12301

Open
remeajayi2022 opened this issue Nov 20, 2024 · 11 comments
Open

[SUPPORT] - Cannot Ingest Protobuf records using Hudi Streamer #12301

remeajayi2022 opened this issue Nov 20, 2024 · 11 comments
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer)

Comments

@remeajayi2022
Copy link

remeajayi2022 commented Nov 20, 2024

I’m trying to ingest from a ProtoKafka source using Hudi Streamer but encountering an issue.

Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
        at ...
Error reading source schema from registry. Please check hoodie.streamer.schemaprovider.registry.url is configured correctly. Truncated URL: https://....ons/latest
     at org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:111)
        at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:204)
        ... 10 more
...
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to parse schema from registry: syntax = "proto3";
package datagen;
...
Caused by: java.lang.NoSuchMethodException: org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082)
        at java.lang.Class.newInstance(Class.java:412)
        ... 13 more 

The stack trace points to a misconfigured schema registry URL. However, the same URL works for Hudi streamer jobs ingesting from AvroKafka sources. When I ping the schema registry URL using curl, it correctly returns the schema.

Additional Context

  1. I've verified the Protobuf schema is valid, it is a sample proto schema from Confluent’s Datagen connector.
  2. I've confirmed the schema registry URL is configured correctly, it works fine with a similarAvroKafka spark job.
  3. I added hoodie.streamer.schemaprovider.proto.class.name and hoodie.streamer.source.kafka.proto.value.deserializer.class=org.apache.kafka.common.serialization.ByteArrayDeserializer. I don't think these are required but their presence/absence did not resolve this error.

Environment Details
Hudi version: v0.15.0
Spark version: 3.1.3
Scala version: 2.12
Google Dataproc version: 2.0.125-debian10

Spark Submit Command and Protobuf Configuration

gcloud dataproc jobs submit spark --cluster <GCP-CLUSTER> \  
  --region us-central1 \  
  --class org.apache.hudi.utilities.streamer.HoodieStreamer \  
  --project <GCP-PROJECT> \  
  --jars <jar-base-url>/jars/hudi-gcp-bundle-0.15.0.jar,<jar-base-url>/jars/spark-avro_2.12-3.1.1.jar,<jar-base-url>/jars/hudi-utilities-bundle-raw_2.12-0.15.0.jar,<jar-base-url>/jars/kafka-protobuf-provider-5.5.0.jar \  
 --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
 --source-class org.apache.hudi.utilities.sources.ProtoKafkaSource \
  --hoodie-conf sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>';" \
  --hoodie-conf hoodie.streamer.schemaprovider.proto.class.name=<topic-name> \
  --hoodie-conf basic.auth.credentials.source=USER_INFO \
  --hoodie-conf schema.registry.basic.auth.user.info=<schema-registry-key>:<schema-registry-secret> \
  --hoodie-conf hoodie.streamer.schemaprovider.registry.url=https://<schema-registry-key>:<schema-registry-secret>@<schema-registry-url>/subjects/<topic-name>-value/versions/latest \
  --hoodie-conf hoodie.streamer.source.kafka.topic=<topic-name> \
  --hoodie-conf hoodie.streamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer \
  --hoodie-conf hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter \

Steps to Reproduce

  1. Build a Hudi 0.15.0 JAR with Spark 3.1 and Scala 2.12.
  2. Use a Protobuf schema on an accessible schema registry, preferably an authenticated one.
  3. Configure Hudi Streamer job with the spark submit command above.
  4. Run the Spark job.

I’d appreciate any insights into resolving this issue.
Is there an alternative or a workaround for configuring the Protobuf schema?
Am I missing any configuration settings?
Thank you for your help!

@remeajayi2022 remeajayi2022 changed the title [SUPPORT] [SUPPORT] - Cannot Ingest Protobuf records using Hudi Streamer Nov 20, 2024
@remeajayi2022
Copy link
Author

remeajayi2022 commented Nov 21, 2024

It seems that this was a known issue (#11598) in v0.15.0 documented here with PR fixes(#11373, #11660 ) by @the-other-tim-brown that have been merged to master. I have tested with a JAR compiled from the Master branch and I'm still running into issues.

24/11/21 21:30:36 ERROR HoodieAsyncService: Service shutdown with error java.util.concurrent.ExecutionException: java.lang.AbstractMethodError: Receiver class io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider does not define or inherit an implementation of the resolved method 'abstract java.util.Optional parseSchema(java.lang.String, java.util.List)' of interface io.confluent.kafka.schemaregistry.SchemaProvider. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]

Do you have any suggestions on how to move past this? Thank you.
cc: @the-other-tim-brown

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 For the latest issue you are seeing, it looks like there may be conflicting Confluent versions on your class path. I can see that the ProtobufSchemaProvider does implement that method in the 5.5.0 version so I am not sure where it may be picking up a newer version that does not implement this interface.

@remeajayi2022
Copy link
Author

remeajayi2022 commented Nov 24, 2024

Hi @the-other-tim-brown,

Thank you so much for taking the time to help with this earlier! I appreciate your insights. Following your suggestion, I’ve removed the kafka-protobuf-provider and kafka-json-schema-provider jars that I had previously. They were a higher version and the cause of those compatibility issues. However, I’m running into a ClassNotFoundException with my configuration:

gcloud dataproc jobs submit spark \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --jars <storage-path>/jars/hudi-utilities-bundle_2.12-1.0.0-SNAPSHOT.jar \
    -- \
    --source-class org.apache.hudi.utilities.sources.ProtoKafkaSource \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --hoodie-conf schema.registry.url=<url> \
    --hoodie-conf hoodie.streamer.schemaprovider.registry.url=<full-url>/subjects/datagen-proto-value/versions/latest \
    --hoodie-conf hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter \
    --hoodie-conf hoodie.streamer.source.kafka.proto.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer

When I run the job, I see the following error:
Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider

My understanding is that the kafka-protobuf-provider jar provides this class but I don't need to import it, as there is a jar included in hudi that already imports this. I have checked the classes included in my jar and the ProtobufSchemaProvider and JsonSchemaProvider are not included.

There may still be a missing dependency or configuration issue that I haven’t accounted for. Do you have any suggestions on how I could resolve these errors? Am I possibly overlooking a required library or some specific classpath setup?

Thanks again for your time and support—it’s greatly appreciated!

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 you need to include the jars for the providers. They should be set to use version 5.5.0 to maintain compatibility with the confluent core code included in the hudi bundles. When we run this, we use a bundled jar on our path to make sure that all the dependencies of our depnedencies are also included.

@ad1happy2go ad1happy2go added the hudistreamer issues related to Hudi streamer (Formely deltastreamer) label Dec 4, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Dec 4, 2024
@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 🏁 Triaged in Hudi Issue Support Dec 4, 2024
@remeajayi2022
Copy link
Author

remeajayi2022 commented Dec 5, 2024

Thanks, @the-other-tim-brown, for your response. I apologize for not including this earlier, but I had added the Protobuf and JSON provider jars with version 5.5.0, in previous runs. But this resulted in a Protobuf compatibility error.

Caused by: java.lang.NoClassDefFoundError: com/google/protobuf/Descriptors$DescriptorValidationException

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 I will try to build a sample jar bundler with what we're using internally to help unblock you this weekend. Apologies for the delay.

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 looking at my own deployment, I have replaced the protobuf jar provided by spark. The version provided by spark 3.1.3 is 2.5.0 which is not compatible. Hudi master branch currently relies on 3.25.5

@remeajayi2022
Copy link
Author

remeajayi2022 commented Dec 9, 2024

Hi @the-other-tim-brown, thanks for your comment. Can you tell me what jars you included and the spark version for your deployment? I only used the Protobuf jar provided by Confluent. I don't know which one you are referring to above.

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 there is a protobuf jar included in the spark runtime already, you would need to remove that jar from the jars directory

@remeajayi2022
Copy link
Author

remeajayi2022 commented Dec 9, 2024

Does your deployment utilize Dataproc? I can't really modify the spark runtime since it's a managed service. The highest supported Spark version on Dataproc is v3.5.1 which uses v3.23.4 of Protobuf. I've tested the job with this Spark version , and as expected from your explanation there are still Protobuf compatibility issues.

I did try to force the job to use hudi's protobuf version with this:
--properties spark.driver.userClassPathFirst=true,spark.executor.userClassPathFirst=true \ --jars gs://dp-data-eng-dev-storage/jars/protobuf-java-3.25.5.jar,.......

But this resulted in a different ClassNotFoundException

Do you mind providing more details about the setup that worked for you so I can replicate that?

@the-other-tim-brown
Copy link
Contributor

@remeajayi2022 I cannot reveal too much detail into the Onehouse managed solution for Hudi unfortunately. If you run with spark 3.5.1 can you try omitting your own version of protobuf on the path to see if this works? I think that the 3.20+ range should work but you cannot have two versions on the path

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hudistreamer issues related to Hudi streamer (Formely deltastreamer)
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

3 participants