-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix parquet file loading #16
base: master
Are you sure you want to change the base?
Conversation
@jm771 Thank you for the patch. This makes sense. Internally we have patched Spark to fix this issue but this makes sense to do it in Flint. This somewhat changes the meaning of "sorted" from "data is sorted" to "data inside each partition is sorted". I think this is fine but need to think more carefully about any possible undesirable behavior. |
I believe that we don't really change that understanding at any other level than the lowest. At the end of fromSortedRDD we wrap up our newly normalised RDD in our a fresh OrderedRDD, this RDD has its partitions sorted so that data value is increasing in index. So I guess now for a "sorted RDD", that we are loading from, we don't assume that the partitions will be ordered, but an OrderedRDD definitely still will have sorted partitions. The other thing worth mentioning is I only fixed the path that TimeSeriesRDD.fromParquet takes. So if you ever wanted to write a method called say "fromNormalizedParquet" then the fromNormalizedSortedRDD method would also need tweaking. |
s"the partition ${h2.partition.index} has the first key ${h2.firstKey}.") | ||
} | ||
} | ||
val sortedHeaders = headers.sortBy(x => x).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only sorting by firstKey, secondKey is not sufficient.
For instance, with:
Header(Partition(0), 100, ...)), Header(Partition(1), 0, ...)
We would end up with:
Header(Partition(1), 0, ...), Header(Partition(0), 100, ...)
The inconsistency between partition index and the index in the array could cause bug.
I think the correct way of handling this is to turn two partitions [100, 200) [0, 100) into:
RangeSplit(OrderedPartition(0), [0, 100)) RangeSplit(OrderedPartition(1), [100, 200))
Instead of:
RangeSplit(OrderedPartition(1), [0, 100)) RangeSplit(OrderedPartition(0), [100, 200))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're ok, so we start with
headers = Header(Partition(0), 100, ...)), Header(Partition(1), 0, ...)
sortedHeaders = Header(Partition(1), 0, ...), Header(Partition(0), 100, ...)
Further down we get:
val normalizedRanges = normalisationStrategy.normalise(sortedHeaders) = [0, 100) [100, 200)
As the normalisation strategies don't look at partition index with my change in place.
We then return our RangeDependencies from this method:
RangeDependency[K, P](idx, normalizedRange, parents)
Where idx is the index from the zipWithIndex.
Which gives us
RangeDependency(0, [0, 100), (Partition(1))), RangeDependency(1, [100, 200), (Partition(0))
So at this point we have successfully re-indexed and the rest of the program can continue happily. And indeed further up the stack with in Conversion.scala we build our RangeSplit off the index of the RangeDependency:
d => RangeSplit(OrderedRDDPartition(d.index).asInstanceOf[Partition], d.range)
Which would give us
RangeSplit(OrderedRDDPartion(0), [0, 100)) RangeSplit(OrderedRDDPartion(1), [100, 200))
Just as we want.
Or is your claim that we need to have the partitions remapped before we pass them into the normalisation strategy? I feel like the normalisation strategies should just rely on data from the headers and not info about the partitions themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, much better to write a test rather than try to make sure this is good by code inspection.
I've written:
"fromSortedRDD" should "sort partitions, and have partition indexes increasing"
Hey @icexelloss, if I wanted to revive and finish this PR, I see that you had a comment about which keys are sorted. Is that, as far as you know, the only block to this PR landing? |
As of Spark 2.0, spark stopped guaranteeing parquet files will be loaded in the order that they were saved to disk. This is discussed here:
https://issues.apache.org/jira/browse/SPARK-20144 with the issue being reported by @icexelloss I believe.
This issue is illustrated in the test "it should "load from parquet"" in TimeSeriesRDDSpec.scala which fails without the rest of the pull request.
However, this issue can be resolved by sorting the partitions by the headers we've already loaded for them. The logic for this sorting is found in HeaderOrdering in RangeDependency.scala. I've fallen back to the old behaviour of sorting by partition index in the case where both partitions contain identical keys, because without this in place a lot of unit tests fail.
This logic is tested in RangeDependencySpec.scala
The rest of the pull request is just swapping from sorting by partition index to sorting using this new ordering, and in Conversion.scala preventing the PartitionsIterator from undoing our hard work sorting things. Given we've now sorted our headers by their keys, the check that headers are sorted by keys is no longer necessary in RangeDependency.scala
CLA has been emailed in.