-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Spark: Fix row lineage inheritance for distributed planning #13061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Spark: Fix row lineage inheritance for distributed planning #13061
Conversation
{ | ||
"testhadoop", | ||
SparkCatalog.class.getName(), | ||
ImmutableMap.of("type", "hadoop"), | ||
FileFormat.PARQUET, | ||
false, | ||
WRITE_DISTRIBUTION_MODE_HASH, | ||
true, | ||
null, | ||
DISTRIBUTED, | ||
3 | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see how much more time this adds, but at least in the interim I feel like it's worth having this as it's what caught the issue.
What we can probably do once the vectorized reader change is in, is remove the parquet + local test above since the vectorized reader is already testing local. Then we'll still have coverage of both local + distributed without multiple parquet local cases like we have right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically you could also override parameters()
in TestRowLevelOperationsWithLineage
so that the test matrix is only increased for those tests and not across all tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I like that better because now we can explicitly see what we're testing against in this class and what we're not testing due to lack of support. Went ahead and did this override.
@@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile, Serializable { | |||
private Long addedSnapshotId = null; | |||
private Integer content = null; | |||
private Long sequenceNumber = null; | |||
private Long firstRowId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok this may not be quite right, forgot about delete manifests...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this not work for delete manifests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should (it'd just be null for delete manifests). I confused myself when debugging an issue with failing tests but those tests are unrelated to if it's a delete manifest or not. Checkout my comment below on why I removed the getter on my latest update
561a43e
to
ebbb9a1
Compare
@@ -46,6 +47,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) { | |||
bean.setAddedSnapshotId(manifest.snapshotId()); | |||
bean.setContent(manifest.content().id()); | |||
bean.setSequenceNumber(manifest.sequenceNumber()); | |||
bean.setFirstRowId(manifest.firstRowId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a proper getter for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On my latest push I removed the getter because the spark actions that read the paths in the manifest file as a dataframe (e.g. orphan files/expire snapshots) also use ManifestFileBean
and when reading the manifest DF, it was failing to find firstRowId
(the existence of the getter makes it so that every record read by these actions needs to have this field, and if it doesn't it fails during analysis).
The getter isn't needed for the distributed planning case since ManifestFileBean implements manifestFile and firstRowId
API gets used. It's also not required for the Spark actions which just need the minimal file info.
But it's a bit odd that this one particular field won't have a getter, let me think if there's a cleaner way. Having two different manifest file bean structures where one is even more minimal seems a bit messy, at least just for this case.
At the very least, if we go with this approach I should inline comment why there's no getter for the field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused, isn't there a getter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer Yeah so that firstRowID()
implementation required for satisfying the ManifestFile
interface but if you checkout some of the other fields for instance partition spec ID https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java#L69
we have an additional getPartitionSpecId
.
This is a bean class that gets used when reading records in spark actions https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L167 so we need to indicate to the Encoder
up above which fields we expect there to be. The way this is indicated is by having the explicit "getFoo" style API since I think under the hood the encoder is using some sort of reflection + name search based on get* to find these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we can't have the getter if it's optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, based on what I observed any actions which end up exercising https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L156 fail with an unresolved column exception from spark:
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `firstRowId` cannot be resolved. Did you mean one of the following? [`content`, `path`, `length`, `partitionSpecId`, `addedSnapshotId`].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually let me just double check that this isn't just a consequence of not projecting the right fields from the manifests tables....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just feels odd that it can't be null :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, even if the field doesn't exist things should just work by the field being null....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @RussellSpitzer , I looked into it a bit more here. The issue I pointed out earlier regarding https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L156 , is simply that when reading the manifests table into a dataframe, we cannot deserialize the records from the manifests metadata table into ManifestFileBean
structure since the expectation is that it does exist in the source records (even if it's null, but the column must exist).
As a result, I think the main part to decide on is do we want to continue relying on this ManifestFileBean in the base spark procedure I linked or do we want to compose a slightly slimmed down structure, which can have a slightly more minimal field set with all the getters/setters.
My take is that we should just continue re-using it, and it's fine if some fields don't have explicit getters since this means that it's not always required from the Encoder
perspective.
Additionally, we'll need to add first_row_id to the manifests metadata table anyways. At that point, we could add back the getter for this because then we can project that field to read into this structure. But that still doesn't seem quite right because many of the procedures don't even need to project that field anyways.
TLDR, I think there are 3 options (I'm preferring 1):
-
Leave things as is, specific fields like first_row_id won't have explicit getters because when deserializing into the bean via the
encoder
this creates the expectation that the record must actually have that field, which isn't always true. -
Add a custom slimmed down version of manifestfilebean for the spark case or the inverse, introduce a bean which composes the existing bean plus a first_row_id for the distributed planning case.
-
Add back a getter when working on the manifest metadata table support for first_row_id and change the projection in the spark procedure. This seems like a backwards way of addressing the issue imo
ebbb9a1
to
644e828
Compare
2f146a6
to
57ac888
Compare
// ToDo: Remove these as row lineage inheritance gets implemented in the other readers | ||
assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need these assumptions since the parameters are being explicitly overriden now
WRITE_DISTRIBUTION_MODE_HASH, | ||
true, | ||
null, | ||
LOCAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this many parameters if we are only alternating planningMode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I mostly just went and used these since it inherits from the existing DML test class but I think we can maybe slim down some of the parameters in this class by making some values constant?
There are ones which we'll need to test against which include file format and vectorized because those validate that the readers are plumbing inheritance correctly.
Probably doesn't add value to test row lineage against a single branch but we should have a test which does writes to main and additional writes to another branch to make sure ID assignment/seq number is still correct.
Let me see if I can slim these down as part of this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer I looked into it a bit more and I think it'll be quite a bit of work that I'm not sure is actually beneficial to slim down here. I think for this class we know we want to vary the following parameters for testing:
1.) Planning mode
2.) File format
3.) Vectorized
4.) Distribution mode (this is mostly to make sure that regardless of plan, we don't end up in situations where we drop the lineage fields, to make sure the rewritten logical rules just work)
The part that we should be able to leave constant is all the catalog stuff and branching, but that's deep down in the inheritance path of this test. And I think we do want to keep the inheritance from the existing row level operations tests not only for the parameters but there are some shared helper methods for table setup, view creation for the merge etc.
All in all, I'm also not super concerned about test times blowing up here (this test suite takes around 30 seconds at the moment).
57ac888
to
5f562d5
Compare
4946f8c
to
3c5312b
Compare
… is being tested can be seen more explicitly
3c5312b
to
2a50f72
Compare
For Spark Distributed planning we use a
ManifestFileBean
implementation of ManifestFile which is serializable and encodes the minimal amount of manifest fields required during distributed planning. This was missing firstRowId and as a result null values would be propogated for the inherited firstRowId. This fixes the issue by simply adding the firstRowId field to the bean which will be set correctly and as a result be inherited correctly during Spark distributed planning.I discovered this when going through the DML row lineage tests and noticed we weren't exercising a distributed planning case and after enabling, debugged. I added another test parameter set for distributed planning.