-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT] Slow commit times with Spark Structured Streaming from Kinesis to MOR Hudi table #12412
Comments
@sumosha The tasks with disk spill is normally a lot slower. Do you see spills in the fast run also? Also noticed very high GC for them. Did you tuned your GC configs. https://hudi.apache.org/docs/tuning-guide/#gc-tuning |
@ad1happy2go It does appear there is spill even in the faster commit (which explains why that job seems to stay consistent at around 15 minutes). I haven't been able to recreate the disk spill in my stress testing, so I assume it is the size difference in the underlying table and files being written (production is around 100GB now, I started with a fresh table in testing and haven't built up a good size yet). I was planning to play around with this setting mentioned in your guide: We are currently on the default collector in EMR (Parallel) in production. I have updated to the G1 (this is jdk 17) in stress testing, though I didn't see much change in the overall commit times. We'll move forward with the G1 since it's recommended anyway. |
@sumosha Is there any other difference you could point out in the fast run or slow run. Is it possible to share the two hudi commit files for them, we want to understand the number of file groups updates and time taken in updating each filegroup. You can also directly send to me on apache hudi slack (Aditya Goenka) if you are not comfortable to share it here. |
@ad1happy2go Hi Aditya! I PM'ed you in Slack with the commits as requested. As far as I can tell, the only difference I see is those warnings in the logs during the slow commit. Thanks! |
Describe the problem you faced
We could use some guidance on a performance issue we are seeing with a Spark Structured Streaming job reading from Kinesis and writing to a MOR Hudi table in EMR 7.2 (Hudi 0.14.1-amzn-1). We have noticed very long-running commits in Hudi and the iterator age gets behind, up to a day or more, on the Kinesis stream as data flows in.
Steps to reproduce the behavior:
Some details about the load, application, and the Hudi table:
What we have noticed is that deltacommits typically take around 15 minutes, however every few commits we notice an excessively long deltacommit which takes around 1.5-3 hours to complete. While this commit is completing, there are no GetRecords calls to the stream to continue processing. We compared the commit metrics between one of these faster commits and a slow commit and there isn't a significant difference in the amount of data being written, maybe about 30k records. What I observed was some log file writes as low as 16 records and 100KB in size took ~4.5 minutes (totalUpsertTime). This seems an excessive amount of time for a log file (not parquet) with so few records.
Things we tried:
The jobs that took the most time in these slow batches:
What is interesting is the jobs Loading latest base files, Getting small files jobs typically run in less than a minute, yet during this period of the "slow commit," these jobs take many minutes. During the "fast commits," it appears the Building Workload Profile job is where most of the commit time is spent. I do observe shuffle spill to disk, so we could use guidance on improving that as well.
We have conducted some stress testing in a comparable test environment and were able to recreate the issue even starting on a blank table. We did the same stress testing earlier in the year, using EMR 6.15 (Hudi 0.14.0-amzn), and did not perceive this behavior.
What I found in the logs during the time of the slow commits are these logs. During the faster commits, these logs don't show up. I noticed around the same time the Delete Archived instants jobs had run, which prompted me to try to turn off the async archive service.
24/12/03 02:50:15 WARN PriorityBasedFileSystemView: Got error running preferred function. Likely due to another concurrent writer in progress. Trying secondary
24/12/03 02:50:17 WARN PriorityBasedFileSystemView: Routing request to secondary file-system view
Turning off async archive service alleviated the periodic extremely slow commit, however we still believe commit times are lackluster given the load and cluster size.
What other configurations should we look at to tune the write speeds? Do we need to reconsider our table design?
Below are the Hudi configurations in our production environment. Attached are some Spark UI screenshots as well.
hudi-defaults on the cluster:
{
"hoodie.archive.async": "true",
"hoodie.archive.automatic": "true",
"hoodie.compact.inline.max.delta.seconds": "21600",
"hoodie.compact.inline.trigger.strategy": "TIME_ELAPSED",
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
"hoodie.embed.timeline.server.async": "true",
"hoodie.schema.cache.enable": "true"
}
Spark writer configuration:
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.dynamodb.region": "us-east-1",
"hoodie.write.lock.dynamodb.table": "...",
"hoodie.write.lock.provider": "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"
"path": hudi_output_path,
"hoodie.table.name": clean_table_name,
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": record_key_fields,
"hoodie.datasource.write.precombine.field": precombine_field,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": clean_db_name,
"hoodie.datasource.hive_sync.table": clean_table_name,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.omit_metadata_fields": "true",
"hoodie.datasource.meta_sync.condition.sync": "true"
Thank you for your assistance!
Expected behavior
Consistent and faster commit times, ideally under 10 minutes
Environment Description
Hudi version : 0.14.1-amzn-1
Spark version : 3.5.1
Hive version : 3.1.3
Hadoop version : 3.3.6
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : yes
Building Workload Profile job / slow stage with shuffle spill
The text was updated successfully, but these errors were encountered: