Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parquet file loading #16

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

jm771
Copy link

@jm771 jm771 commented Jul 7, 2017

As of Spark 2.0, spark stopped guaranteeing parquet files will be loaded in the order that they were saved to disk. This is discussed here:
https://issues.apache.org/jira/browse/SPARK-20144 with the issue being reported by @icexelloss I believe.
This issue is illustrated in the test "it should "load from parquet"" in TimeSeriesRDDSpec.scala which fails without the rest of the pull request.

However, this issue can be resolved by sorting the partitions by the headers we've already loaded for them. The logic for this sorting is found in HeaderOrdering in RangeDependency.scala. I've fallen back to the old behaviour of sorting by partition index in the case where both partitions contain identical keys, because without this in place a lot of unit tests fail.

This logic is tested in RangeDependencySpec.scala

The rest of the pull request is just swapping from sorting by partition index to sorting using this new ordering, and in Conversion.scala preventing the PartitionsIterator from undoing our hard work sorting things. Given we've now sorted our headers by their keys, the check that headers are sorted by keys is no longer necessary in RangeDependency.scala

CLA has been emailed in.

@icexelloss
Copy link
Member

icexelloss commented Jul 10, 2017

@jm771 Thank you for the patch. This makes sense. Internally we have patched Spark to fix this issue but this makes sense to do it in Flint.

This somewhat changes the meaning of "sorted" from "data is sorted" to "data inside each partition is sorted". I think this is fine but need to think more carefully about any possible undesirable behavior.

@jm771
Copy link
Author

jm771 commented Jul 11, 2017

I believe that we don't really change that understanding at any other level than the lowest. At the end of fromSortedRDD we wrap up our newly normalised RDD in our a fresh OrderedRDD, this RDD has its partitions sorted so that data value is increasing in index. So I guess now for a "sorted RDD", that we are loading from, we don't assume that the partitions will be ordered, but an OrderedRDD definitely still will have sorted partitions.

The other thing worth mentioning is I only fixed the path that TimeSeriesRDD.fromParquet takes. So if you ever wanted to write a method called say "fromNormalizedParquet" then the fromNormalizedSortedRDD method would also need tweaking.

s"the partition ${h2.partition.index} has the first key ${h2.firstKey}.")
}
}
val sortedHeaders = headers.sortBy(x => x).toArray
Copy link
Member

@icexelloss icexelloss Jul 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only sorting by firstKey, secondKey is not sufficient.

For instance, with:

Header(Partition(0), 100, ...)), Header(Partition(1), 0, ...)

We would end up with:

Header(Partition(1), 0, ...), Header(Partition(0), 100, ...)

The inconsistency between partition index and the index in the array could cause bug.

I think the correct way of handling this is to turn two partitions [100, 200) [0, 100) into:

RangeSplit(OrderedPartition(0), [0, 100)) RangeSplit(OrderedPartition(1), [100, 200))

Instead of:

RangeSplit(OrderedPartition(1), [0, 100)) RangeSplit(OrderedPartition(0), [100, 200))

Copy link
Author

@jm771 jm771 Jul 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're ok, so we start with
headers = Header(Partition(0), 100, ...)), Header(Partition(1), 0, ...)

sortedHeaders = Header(Partition(1), 0, ...), Header(Partition(0), 100, ...)

Further down we get:
val normalizedRanges = normalisationStrategy.normalise(sortedHeaders) = [0, 100) [100, 200)
As the normalisation strategies don't look at partition index with my change in place.

We then return our RangeDependencies from this method:
RangeDependency[K, P](idx, normalizedRange, parents)
Where idx is the index from the zipWithIndex.

Which gives us
RangeDependency(0, [0, 100), (Partition(1))), RangeDependency(1, [100, 200), (Partition(0))

So at this point we have successfully re-indexed and the rest of the program can continue happily. And indeed further up the stack with in Conversion.scala we build our RangeSplit off the index of the RangeDependency:
d => RangeSplit(OrderedRDDPartition(d.index).asInstanceOf[Partition], d.range)

Which would give us
RangeSplit(OrderedRDDPartion(0), [0, 100)) RangeSplit(OrderedRDDPartion(1), [100, 200))

Just as we want.

Or is your claim that we need to have the partitions remapped before we pass them into the normalisation strategy? I feel like the normalisation strategies should just rely on data from the headers and not info about the partitions themselves.

Copy link
Author

@jm771 jm771 Jul 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, much better to write a test rather than try to make sure this is good by code inspection.
I've written:
"fromSortedRDD" should "sort partitions, and have partition indexes increasing"

@dgrnbrg
Copy link

dgrnbrg commented Oct 1, 2019

Hey @icexelloss, if I wanted to revive and finish this PR, I see that you had a comment about which keys are sorted. Is that, as far as you know, the only block to this PR landing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants