Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Slow commit times with Spark Structured Streaming from Kinesis to MOR Hudi table #12412

Open
sumosha opened this issue Dec 3, 2024 · 4 comments

Comments

@sumosha
Copy link

sumosha commented Dec 3, 2024

Describe the problem you faced

We could use some guidance on a performance issue we are seeing with a Spark Structured Streaming job reading from Kinesis and writing to a MOR Hudi table in EMR 7.2 (Hudi 0.14.1-amzn-1). We have noticed very long-running commits in Hudi and the iterator age gets behind, up to a day or more, on the Kinesis stream as data flows in.

Steps to reproduce the behavior:

Some details about the load, application, and the Hudi table:

  • 20 shard kinesis stream with around 100-500MB per minute, with periodic spikes up to 2GB, typically with increased load during the work week
  • Each microbatch appears to consume around 8GB incoming data
  • MOR, Simple Index table. Currently 100.5 GB with 886 partitions dimension table. No meaningful, time-based attribute to cluster or partition by.
  • Record key is a UUID. All writes use the upsert operation. In a given batch, it is very likely that close to all partitions will have an update or insert.
  • EMR Cluster version 7.2 with 9 m5a.2xlarge workers (plus 1 m5a.2xlarge driver). We scaled this up to 20 and saw some write improvement, but still the same behaviors as I will describe below.

What we have noticed is that deltacommits typically take around 15 minutes, however every few commits we notice an excessively long deltacommit which takes around 1.5-3 hours to complete. While this commit is completing, there are no GetRecords calls to the stream to continue processing. We compared the commit metrics between one of these faster commits and a slow commit and there isn't a significant difference in the amount of data being written, maybe about 30k records. What I observed was some log file writes as low as 16 records and 100KB in size took ~4.5 minutes (totalUpsertTime). This seems an excessive amount of time for a log file (not parquet) with so few records.

Things we tried:

  • Turn down the frequency of compaction to every 6 hours - no improvement.
  • Reduce the number of records read per shard from Kinesis (in case it was memory pressure of incoming data) - no improvement
  • Turn off async archive service - This fixed the periodic slow commit (see below notes), however commits still appear very slow generally (15-20 minutes)
  • Upgrade to EMR v.7.5.0 (Hudi 0.15) - no improvement

The jobs that took the most time in these slow batches:

  • Loading latest base files for all partitions (590 tasks at 9.8 minutes)
  • Getting small files from partitions (590 tasks at 9.7 minutes)
  • Building workload profile (104 tasks at 13 minutes)
  • Doing partition and writing data (~4648 tasks at 1.5+ hours, still running but only halfway through). We believe this correlates to the number of files written, so at 72 available cores we can understand this will be slow, however we'd like to improve write speeds here.

What is interesting is the jobs Loading latest base files, Getting small files jobs typically run in less than a minute, yet during this period of the "slow commit," these jobs take many minutes. During the "fast commits," it appears the Building Workload Profile job is where most of the commit time is spent. I do observe shuffle spill to disk, so we could use guidance on improving that as well.

We have conducted some stress testing in a comparable test environment and were able to recreate the issue even starting on a blank table. We did the same stress testing earlier in the year, using EMR 6.15 (Hudi 0.14.0-amzn), and did not perceive this behavior.

What I found in the logs during the time of the slow commits are these logs. During the faster commits, these logs don't show up. I noticed around the same time the Delete Archived instants jobs had run, which prompted me to try to turn off the async archive service.
24/12/03 02:50:15 WARN PriorityBasedFileSystemView: Got error running preferred function. Likely due to another concurrent writer in progress. Trying secondary
24/12/03 02:50:17 WARN PriorityBasedFileSystemView: Routing request to secondary file-system view

Turning off async archive service alleviated the periodic extremely slow commit, however we still believe commit times are lackluster given the load and cluster size.

What other configurations should we look at to tune the write speeds? Do we need to reconsider our table design?

Below are the Hudi configurations in our production environment. Attached are some Spark UI screenshots as well.

hudi-defaults on the cluster:
{
"hoodie.archive.async": "true",
"hoodie.archive.automatic": "true",
"hoodie.compact.inline.max.delta.seconds": "21600",
"hoodie.compact.inline.trigger.strategy": "TIME_ELAPSED",
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
"hoodie.embed.timeline.server.async": "true",
"hoodie.schema.cache.enable": "true"
}
Spark writer configuration:
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.dynamodb.region": "us-east-1",
"hoodie.write.lock.dynamodb.table": "...",
"hoodie.write.lock.provider": "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"
"path": hudi_output_path,
"hoodie.table.name": clean_table_name,
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": record_key_fields,
"hoodie.datasource.write.precombine.field": precombine_field,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": clean_db_name,
"hoodie.datasource.hive_sync.table": clean_table_name,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.omit_metadata_fields": "true",
"hoodie.datasource.meta_sync.condition.sync": "true"

Thank you for your assistance!

Expected behavior

Consistent and faster commit times, ideally under 10 minutes

Environment Description

  • Hudi version : 0.14.1-amzn-1

  • Spark version : 3.5.1

  • Hive version : 3.1.3

  • Hadoop version : 3.3.6

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes

Building Workload Profile job / slow stage with shuffle spill
building_workload_profile_spill_sparkui
building_workload_profile_executor_metrics_sparkui

@ad1happy2go
Copy link
Collaborator

@sumosha The tasks with disk spill is normally a lot slower. Do you see spills in the fast run also? Also noticed very high GC for them. Did you tuned your GC configs. https://hudi.apache.org/docs/tuning-guide/#gc-tuning

@sumosha
Copy link
Author

sumosha commented Dec 4, 2024

@ad1happy2go It does appear there is spill even in the faster commit (which explains why that job seems to stay consistent at around 15 minutes).

faster_commit_spill_executors

I haven't been able to recreate the disk spill in my stress testing, so I assume it is the size difference in the underlying table and files being written (production is around 100GB now, I started with a fresh table in testing and haven't built up a good size yet). I was planning to play around with this setting mentioned in your guide: hoodie.memory.merge.fraction. Does this seem the right track? I'm wondering if just a larger instance size is warranted as this grows (maybe fewer instances to get a comparable core count).

We are currently on the default collector in EMR (Parallel) in production. I have updated to the G1 (this is jdk 17) in stress testing, though I didn't see much change in the overall commit times. We'll move forward with the G1 since it's recommended anyway.

@ad1happy2go
Copy link
Collaborator

@sumosha Is there any other difference you could point out in the fast run or slow run. Is it possible to share the two hudi commit files for them, we want to understand the number of file groups updates and time taken in updating each filegroup. You can also directly send to me on apache hudi slack (Aditya Goenka) if you are not comfortable to share it here.

@sumosha
Copy link
Author

sumosha commented Dec 6, 2024

@ad1happy2go Hi Aditya! I PM'ed you in Slack with the commits as requested. As far as I can tell, the only difference I see is those warnings in the logs during the slow commit. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

2 participants