Skip to content

Spark: Fix row lineage inheritance for distributed planning #13061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

amogh-jahagirdar
Copy link
Contributor

For Spark Distributed planning we use a ManifestFileBean implementation of ManifestFile which is serializable and encodes the minimal amount of manifest fields required during distributed planning. This was missing firstRowId and as a result null values would be propogated for the inherited firstRowId. This fixes the issue by simply adding the firstRowId field to the bean which will be set correctly and as a result be inherited correctly during Spark distributed planning.

I discovered this when going through the DML row lineage tests and noticed we weren't exercising a distributed planning case and after enabling, debugged. I added another test parameter set for distributed planning.

@github-actions github-actions bot added the spark label May 14, 2025
Comment on lines 199 to 210
{
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop"),
FileFormat.PARQUET,
false,
WRITE_DISTRIBUTION_MODE_HASH,
true,
null,
DISTRIBUTED,
3
},
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see how much more time this adds, but at least in the interim I feel like it's worth having this as it's what caught the issue.

What we can probably do once the vectorized reader change is in, is remove the parquet + local test above since the vectorized reader is already testing local. Then we'll still have coverage of both local + distributed without multiple parquet local cases like we have right now.

Copy link
Contributor

@nastra nastra May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically you could also override parameters() in TestRowLevelOperationsWithLineage so that the test matrix is only increased for those tests and not across all tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I like that better because now we can explicitly see what we're testing against in this class and what we're not testing due to lack of support. Went ahead and did this override.

@@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile, Serializable {
private Long addedSnapshotId = null;
private Integer content = null;
private Long sequenceNumber = null;
private Long firstRowId = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this may not be quite right, forgot about delete manifests...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this not work for delete manifests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should (it'd just be null for delete manifests). I confused myself when debugging an issue with failing tests but those tests are unrelated to if it's a delete manifest or not. Checkout my comment below on why I removed the getter on my latest update

@amogh-jahagirdar amogh-jahagirdar force-pushed the row-lineage-distributed-planning-fix branch from 561a43e to ebbb9a1 Compare May 15, 2025 01:15
@@ -46,6 +47,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) {
bean.setAddedSnapshotId(manifest.snapshotId());
bean.setContent(manifest.content().id());
bean.setSequenceNumber(manifest.sequenceNumber());
bean.setFirstRowId(manifest.firstRowId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a proper getter for it?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my latest push I removed the getter because the spark actions that read the paths in the manifest file as a dataframe (e.g. orphan files/expire snapshots) also use ManifestFileBean and when reading the manifest DF, it was failing to find firstRowId (the existence of the getter makes it so that every record read by these actions needs to have this field, and if it doesn't it fails during analysis).

The getter isn't needed for the distributed planning case since ManifestFileBean implements manifestFile and firstRowId API gets used. It's also not required for the Spark actions which just need the minimal file info.

But it's a bit odd that this one particular field won't have a getter, let me think if there's a cleaner way. Having two different manifest file bean structures where one is even more minimal seems a bit messy, at least just for this case.
At the very least, if we go with this approach I should inline comment why there's no getter for the field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer Yeah so that firstRowID() implementation required for satisfying the ManifestFile interface but if you checkout some of the other fields for instance partition spec ID https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java#L69
we have an additional getPartitionSpecId.

This is a bean class that gets used when reading records in spark actions https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L167 so we need to indicate to the Encoder up above which fields we expect there to be. The way this is indicated is by having the explicit "getFoo" style API since I think under the hood the encoder is using some sort of reflection + name search based on get* to find these

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we can't have the getter if it's optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, based on what I observed any actions which end up exercising https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L156 fail with an unresolved column exception from spark:

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `firstRowId` cannot be resolved. Did you mean one of the following? [`content`, `path`, `length`, `partitionSpecId`, `addedSnapshotId`].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually let me just double check that this isn't just a consequence of not projecting the right fields from the manifests tables....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just feels odd that it can't be null :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, even if the field doesn't exist things should just work by the field being null....

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok @RussellSpitzer , I looked into it a bit more here. The issue I pointed out earlier regarding https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L156 , is simply that when reading the manifests table into a dataframe, we cannot deserialize the records from the manifests metadata table into ManifestFileBean structure since the expectation is that it does exist in the source records (even if it's null, but the column must exist).

As a result, I think the main part to decide on is do we want to continue relying on this ManifestFileBean in the base spark procedure I linked or do we want to compose a slightly slimmed down structure, which can have a slightly more minimal field set with all the getters/setters.

My take is that we should just continue re-using it, and it's fine if some fields don't have explicit getters since this means that it's not always required from the Encoder perspective.

Additionally, we'll need to add first_row_id to the manifests metadata table anyways. At that point, we could add back the getter for this because then we can project that field to read into this structure. But that still doesn't seem quite right because many of the procedures don't even need to project that field anyways.

TLDR, I think there are 3 options (I'm preferring 1):

  1. Leave things as is, specific fields like first_row_id won't have explicit getters because when deserializing into the bean via the encoder this creates the expectation that the record must actually have that field, which isn't always true.

  2. Add a custom slimmed down version of manifestfilebean for the spark case or the inverse, introduce a bean which composes the existing bean plus a first_row_id for the distributed planning case.

  3. Add back a getter when working on the manifest metadata table support for first_row_id and change the projection in the spark procedure. This seems like a backwards way of addressing the issue imo

@amogh-jahagirdar amogh-jahagirdar force-pushed the row-lineage-distributed-planning-fix branch 3 times, most recently from 2f146a6 to 57ac888 Compare May 22, 2025 16:41
Comment on lines -92 to -93
// ToDo: Remove these as row lineage inheritance gets implemented in the other readers
assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need these assumptions since the parameters are being explicitly overriden now

WRITE_DISTRIBUTION_MODE_HASH,
true,
null,
LOCAL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this many parameters if we are only alternating planningMode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I mostly just went and used these since it inherits from the existing DML test class but I think we can maybe slim down some of the parameters in this class by making some values constant?

There are ones which we'll need to test against which include file format and vectorized because those validate that the readers are plumbing inheritance correctly.

Probably doesn't add value to test row lineage against a single branch but we should have a test which does writes to main and additional writes to another branch to make sure ID assignment/seq number is still correct.

Let me see if I can slim these down as part of this change

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer I looked into it a bit more and I think it'll be quite a bit of work that I'm not sure is actually beneficial to slim down here. I think for this class we know we want to vary the following parameters for testing:

1.) Planning mode
2.) File format
3.) Vectorized
4.) Distribution mode (this is mostly to make sure that regardless of plan, we don't end up in situations where we drop the lineage fields, to make sure the rewritten logical rules just work)

The part that we should be able to leave constant is all the catalog stuff and branching, but that's deep down in the inheritance path of this test. And I think we do want to keep the inheritance from the existing row level operations tests not only for the parameters but there are some shared helper methods for table setup, view creation for the merge etc.

All in all, I'm also not super concerned about test times blowing up here (this test suite takes around 30 seconds at the moment).

@amogh-jahagirdar amogh-jahagirdar force-pushed the row-lineage-distributed-planning-fix branch from 57ac888 to 5f562d5 Compare May 29, 2025 17:51
@github-actions github-actions bot added the core label May 29, 2025
@amogh-jahagirdar amogh-jahagirdar force-pushed the row-lineage-distributed-planning-fix branch 2 times, most recently from 4946f8c to 3c5312b Compare May 29, 2025 17:55
… is being tested can be seen more explicitly
@amogh-jahagirdar amogh-jahagirdar force-pushed the row-lineage-distributed-planning-fix branch from 3c5312b to 2a50f72 Compare May 29, 2025 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants