[Python] Expand SDF by default in PortableRunner#37965
[Python] Expand SDF by default in PortableRunner#37965Eliaaazzz wants to merge 2 commits intoapache:masterfrom
Conversation
Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs apache#24422.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical issue in Apache Beam's PortableRunner where Python Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
60a831e to
169645d
Compare
|
Assigning reviewers: R: @damccorm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
e54f3d3 to
2641d1f
Compare
Fixes #24422.
Python
iobase.Readuses a Splittable DoFn internally. PortableRunner's default batch pre-optimization path did not includetranslations.expand_sdf, so portable runners that do not support SDFs natively can receive a single SDFParDofor reads. In Spark, that means Python reads such asReadFromParquetmay execute on a single partition without parallelization.This change:
translations.expand_sdfto PortableRunner's default optimization phases--experiments=pre_optimize=expand_sdfas an explicit custom phasepre_optimize=expand_sdfbeam.io.Read(BoundedSource)expansion, including the expectedRESHUFFLEThe bounded read test covers the issue scenario behind
ReadFromParquetand similar PythonReadtransforms by verifying that the optimized pipeline contains the SDF component stages:PAIR_WITH_RESTRICTIONSPLIT_AND_SIZE_RESTRICTIONSPROCESS_SIZED_ELEMENTS_AND_RESTRICTIONSRESHUFFLELocal verification:
git diff --checkpython -m compileall sdks/python/apache_beam/runners/portability/portable_runner.py sdks/python/apache_beam/runners/portability/portable_runner_test.py