Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the Spark Operator to launch applications using user-defined mechanisms beyond the default spark-submit #2337

Open
c-h-afzal opened this issue Nov 26, 2024 · 18 comments

Comments

@c-h-afzal
Copy link
Contributor

What feature you would like to be added?

Ability for Spark Operator to adopt a provider-pattern/pluggable-mechanism to launch spark applications. A user can specify an option other than spark-submit to launch spark applications.

Why is this needed?

In the current implementation Spark Operator invokes spark-submit to launch a spark application in the cluster. From our testing we have determined that the penalty for the JVM spin-up causes significant increase in job latency when the cluster is under stress/heavy-load, i.e. the rate of spark applications being enqueued is higher than the rate of applications being dequeued causing the spark operator’s internal queue to swell-up and affect job latencies. We want to be able to launch spark application using native Go, without the JVM spin-up as part of spark-submit.

Describe the solution you would like

The solution we are proposing (and willing to contribute to, if consensus can be reached) is for the Spark Operator to allow changing the only mechanism (spark-submit) of launching spark applications to a user specified one. The default mechanism remains to be spark-submit. Users can specify their own plugin to launch spark applications a different way. Specifically, (here at Salesforce Big Data Org), in our fork, we create driver pods using Go and skip the JVM penalty. The work-around was devised by @ gangahiremath and mentioned in the issue#1574

Our work-around ports the functionality of spark-submit to Golang and significantly reduces the time it takes for a SparkApplication CRD object to be CREATED and then transition to the SUBMITTED state. If there’s enough interest in our approach, we plan to open-source Ganga's work-around too.

Describe alternatives you have considered

For improving latencies we have considered the pending PR which claims of performance boost by have a single queue per app. However, we have not realized the claimed performance enhancements in our testing. We still find JVM spin-up times to be the bottle-neck and hence the proposal.

Additional context

No response

Love this feature?

Give it a 👍 We prioritize the features with most 👍

@yuchaoran2011
Copy link
Contributor

Thanks for the thoughtful proposal. If the Go native way of creating Spark driver is inherently faster than Spark-submit, I would argue that it makes sense to use that as the default. Do you see any limitations compared to spark-submit?

@c-h-afzal
Copy link
Contributor Author

@yuchaoran2011 - Thanks for your response. So there are a couple of reasons:

  1. We'll have to maintain functional parity of launching spark applications with spark-submit. Any changes/additions in spark-submit would need to be replicated in the native Go alternate.

  2. Depending on workload characteristics, some users may not experience significant performance gains. Though from our testing we have always found the Go workaround to outperform spark-submit invocations but the author of the PR#1990 mentioned that they didn't see performance enhancements when using our PR. So we don't want to propose our workaround to be default until we have enough support/anecdotal evidence from the community to make the switch.

Please let me know if you have anymore questions. Thank you.

@yuchaoran2011
Copy link
Contributor

All valid points. Maintaining feature parity with spark-submit will be a long-term effort that the community is willing to shoulder. Before contributing the code changes, I think the community can decide on a direction more easily if you can provide a document detailing the overall design (i.e. how the pluggable driver creation mechanism works) and the benchmarking you did comparing your implementation with the current code base. Tagging the rest of the maintainers for their thoughts as well @ChenYi015 @vara-bonthu @jacobsalway

@bnetzi
Copy link

bnetzi commented Nov 27, 2024

I think this is a great feature, but I still think it should be opt-in.
We actually tried in our env to use the proposed code here - master...gangahiremath:spark-on-k8s-operator:master

And it does work, but there were some minor differences which I don't recall currently (I believe it was something related to volume mounts on pod templates) but we had to implement.

The point is - spark-submit behavior can be very different per configuration, to make a replacement that would be equivalent for all usage types would require an on-going effort.

Also - for now spark operator can support multiple spark versions without any issues. With this approach it would mean if spark submit would change in the future, spark operator would be need to either need to know which version of spark it runs and change its behavior accordingly, or to be coupled with spark version.

@c-h-afzal
Copy link
Contributor Author

@yuchaoran2011 - We'll work on a design and share it with the community. Thanks.

@c-h-afzal
Copy link
Contributor Author

c-h-afzal commented Dec 10, 2024

Hey Guys - we have the design doc ready. Please feel free to add any comments/feedback. Also, added a PR link the doc that demonstrates what the changes would eventually look-like. Thanks.

fyi @yuchaoran2011

@yuchaoran2011
Copy link
Contributor

@c-h-afzal Requested access. Could you open up read-only access for everyone so that the community can review and comment?

@c-h-afzal
Copy link
Contributor Author

@yuchaoran2011 Ah, I'd love to but given the doc. is on Salesforce account, the company policy restricts public access. Let me check within if there's a way for me to make this doc public. In the meanwhile, I have given you access.

@c-h-afzal
Copy link
Contributor Author

Happy New Year guys! :) - Wondering if you guys got a chance to go through the doc and have feedback to share? I have requested for the doc to be accessible without requesting access but I have to jump through some red-tape/approvals before the doc. would be free of the request-access-wall. Thanks.

@bnetzi
Copy link

bnetzi commented Jan 9, 2025

Hi @c-h-afzal - I read the design doc and it looks good

@andreyvelich
Copy link
Member

Hi @c-h-afzal, please can you open the comment access to your design doc for Kubeflow Discuss Google group ?
Thus, all Kubeflow community members can see it: https://groups.google.com/g/kubeflow-discuss

[email protected]

@c-h-afzal
Copy link
Contributor Author

@andreyvelich - Done. Kindly see if you can access now.

My apologies but I was unable to make it universally accessible given Salesforce internal policies. I can't change the access levels myself and from talking to the support staff doesn't seem like the company allows for more easy accessibility than what the document already has.

@vara-bonthu
Copy link
Contributor

@c-h-afzal Love the proposal! I’ve left a comment in the document. Would love to get more feedback from other contributors as well

On a similar topic, we’re planning to conduct load tests on Spark Operator to test various scenarios and measure launch times under high concurrency of jobs. We’ve secured some capacity using AWS EKS for this purpose.

We’ll share the load test documentation with the broader community soon. Would you be able to provide examples where latency has been observed (e.g., number of concurrent jobs, total number of pods, etc.)? We plan to test these with the new v2.1.0 tag and repeat the same test after future performance improvement like your proposal.

@bnetzi Please mention your load scenarios as well. Thanks

@bnetzi
Copy link

bnetzi commented Jan 31, 2025

Hi @vara-bonthu , so we've tested the following workload, the results were impressive enough for us to deploy the new operator on our production clusters, and we are using it for 2 weeks now. We haven't saved the exact results as it was obvious it was good enough for us, but I do have the summary.

The workload was chosen to be similar to our peek production workload in terms of app creation rate per cluster:

Our demo spark app -
Opens one executor, running some dummy code for a ~ 5 minutes and shuts down.

Submit Rate:

  1. Running 100 apps in as fast as one computer can (took ~1 minute) * 5 batches (total 500 apps), while waiting 30 seconds between batches.
  2. For one hour in a loop - Running 40 apps (~one per second), wait 20 seconds than run again 40 apps.

Our specs:

We are using an EKS cluster so we can't know the actual k8s control plane size, but we were told by support that at the time of the test it was already scaled up to a max size, don't know what it means but we haven't had k8s control plane issues (which might be the case for others as we have a large qps config as stated ahead)

spark operator controller pod:
31 vCPUs
100 GB memory

Worker queue config
bucketQPS: '1000'
bucketSize: '2000'

Controller config
workers: 100
maxTrackedExecutorPerApp: '1'

Results - in general all ran smooth without delays and all apps (just some demo apps) finished successfully, but there were some interesting outcomes.

CPU was used 100% at max (when submitted hundred apps in a span of a minute)
Memory had very low utilization, don't remember the exact numbers, but it was low enough to be non issue.

For each app we had a service that measured the time between the spark app record creation and the actual pod creation. Also we checked the status for each app every 5 minutes and marked its final status.

For the most part the time for app creation was sub zero seconds. In the most heavy workload it went up to 15 seconds.
But an interesting behavior while running the one hour test was (and it repeated itself as I ran the tests 3 times) - after ~30 minutes the delay went app to 30 seconds for ~10 minutes than went back down.

Also - some apps finished successfully but the state was stuck on running. We have a 'zombie killer' service in our prod env so it wasn't a concern for us, but it is probably something that should be investigated.

@c-h-afzal
Copy link
Contributor Author

@vara-bonthu Apologies for the delayed response. For us the hardware setup was

130 nodes in the cluster
16Gu memory each node and 4 CPUs

A single instance of Spark Operator was run on a single node with resource limits of 4 CPUs and 14 Gi memory.

Next we ran different scenarios with the community version of SO and our modified version of Spark operator. For each scenario we submitted the same Spark job that computes the Pi value and runs for a little over ~ 100 seconds. The rate of submission was 1 job per second via a script. We measured the median times it takes for each job to complete from the time the SparkApplication CRD is created to the time it is marked as completed. Obviously, each job takes the same constant time to compute the value of Pi once scheduled but the time it sits in the SO queue can change with the load.

The scenarios we ran were:

50 jobs at the rate of 1 job per second
The median time for a job to complete with the community version of SO on our test-bed was 276 seconds, whereas it was 40 seconds with our modified version.

900 jobs at the rate of 1 job per second
The median time for a job to complete with the community version of SO was 1087 seconds, whereas it was 41 seconds with our modified version.

We did run some other permutations of tests too, tinkering with command line params etc but each time we found the modified version of our SO that natively launches spark applications to outperform the community version with spark-submit. We have arrived at the following conclusion from our tests:

  1. The spark-submit adds to latency of launching jobs because of spinning-up a new JVM each time.

  2. The Achilles heel of the spark operator is the single queue for spark applications. This is the classic consumer/producer problem, where one can overwhelm a queue by simply increasing the rate at which the jobs are enqueued vs the rate at which they are launched/dequeued. The net effect is that the queue length grows while the load spike lasts and subsequently causes job latencies to increase. No matter how much hardware we throw at the spark operator, we can always make the job submission rate high enough for the queue to grow. Nevertheless, the rate of consumption of jobs in the queue is much higher in our modified version of SO since the time it takes to launch a spark job is much smaller in comparison to launching a job with spark-submit. Therefore, in our tests it took a very high load of jobs around 4500 submitted concurrently by 5 shell scripts without any sleeps to overwhelm the modified version of our SO. However, the fact remains that we can only vertically scale SO by running on ever more powerful hardware but can't scale it horizontally because of the inherent design.

The previous work by bnetzi tried to address the single queue issue and our modification alleviates the problem by helping the SO increase its rate of consumption.

Sorry if this is unneeded/too-much information but this was really the jist/crux of our experimentation with spark operator. Please let me know if you need any more information or clarification.

@c-h-afzal
Copy link
Contributor Author

@yuchaoran2011 @vara-bonthu - I was wondering if we have your blessings and buy-in for the proposed design? If yes, we can raise PRs for the feature.

Also on our end we are working to open-source our native submit plugin. Thanks.

@bnetzi
Copy link

bnetzi commented Feb 8, 2025

@c-h-afzal - before addressing your important notes, I'll be happy to know if you modified the spark operator configuration itself before running the tests.
Also which spark version you used for the base of the operator?

As for your remarks: first of all - I agree with the conclusion - the main issue in spark operator is that it does not scale out, only scale up (vertically).

I don't think my previous solution really solved it as it was only a bypass to make the single worker not get stuck on a single mutex (which was the case for the previous spark operator version)

The current operator handles it with a much better worker threads mechanism, although it is CPU intensive mostly because of the JVM overhead.

A real solution would be to separate between one component that would 'observe', and worker pods which will act. Probably one controller node and a bunch of worker pods that are communicating via an external queue and are scaling out with keda according to CPU and queue metrics.

I believe your solution is important but it is just reducing CPU and memory usage, which is significant but at some point we get to the same bottleneck.

Let me know what you think

@c-h-afzal
Copy link
Contributor Author

@bnetzi - so we changed the controller-thread config to a higher number. But given we had only 4 CPUs, increasing the number of threads beyond 4 wouldn't help (the CPUs we used weren't hyper-threaded) but we still set it to double digits. That's the primary config setting we changed. The config "maxTrackedExecutorPerApp" wouldn't be helpful as the Pi job spawns a single executor. We didn't experiment with the bucket* configs.

Regarding the versions, we had a mix going-on here. We were using a very old beta version in prod that hadn't been updated to the latest changes. So we tested this beta version (I can find the exact one if you are interested). Then we had our modifications on a fork of this beta version that we tested. Then we also tested the community master branch with changes up to early Sept 2024. Then we finally tested a version with the PR you authored previously. By the way, we also tested with some JDK optimization flags to see if it would have an effect on JVM spin-up times but the changes were insignificant.

Yes, I understand the mutex part you did but your work also indirectly addresses the single queue shortcoming, though I agree it wasn't the solution.

Your idea for the solution definitely sounds promising but we'll to think through alternatives if any. This would likely be an involved re-write of the spark operator.

Yes, you have hit the nail on the head! Our solution is just "alleviating" the problem but not solving it. And yes, eventually for a load high enough we'll end-up with the same bottleneck. Our solution best fits in the category of optimization.

Appreciate you reading through and commenting :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants