Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Support adaptive bucketed scan for FilterIndexRule #332

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

sezruby
Copy link
Collaborator

@sezruby sezruby commented Jan 25, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR applies bucketing for Filter index if the given query is applicable for bucket pruning.
We introduced switchable config for it in #329, but it's a global config and it might be difficult to set the flag for each query.

To do this, I extracted bucket pruning logic(genBucketSet) from FileSourceStrategy in Spark.

I introduced the following config to work with the static config (spark.hyperspace.index.filterRule.useBucketSpec)

  // If INDEX_FILTER_RULE_USE_BUCKET_SPEC config is false, Hyperspace applies bucketing if
  // the given filter query is applicable for bucket pruning AND the number of selected buckets
  // is under this threshold.
  val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD =
    "spark.hyperspace.index.filterRule.autoBucketing.threshold"
  val INDEX_FILTER_RULE_AUTO_BUCKETING_THRESHOLD_DEFAULT = "0.8"

Note: we could exclude unnecessary index files using selected bucket ids, but it'll cause some cost as InMemoryIndexFile can be different for each index file list (refer #324). Creating a new custom strategy or other class could filter the paths internally, but it'll bring some complexity in codebase. Thus for now, we set bucketSpec for pruning unnecessary data.

Does this PR introduce any user-facing change?

Yes, bucketSpec for filter index is applied if bucket pruning is applicable for the given query.

How was this patch tested?

Unit test

@sezruby sezruby force-pushed the filterbucketconfig2 branch 2 times, most recently from 55f2b62 to 2f0b295 Compare January 25, 2021 10:31
@sezruby sezruby self-assigned this Jan 25, 2021
val foundPrunedBuckets = fileSourceScanNodes.head.collect {
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
if optionalBucketSet.isDefined && (optionalBucketSet.get
.cardinality() * 1.0 < index.bucketSpec.numBuckets * 0.8) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how you came up with 0.8?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get the list of files after bucket pruning here and use it with useBucektSpec = false? For example, let's say only one bucket is selected. In this case, only one task will be launched to process the file (and the file can be huge). It may be better to split the files to multiple tasks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be possible if we extract genBucketSet
https://github.com/apache/spark/blob/d1177b52304217f4cb86506fd1887ec98879ed16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L116

and use it to filter the file paths before transforming the plan.
In this way, we don't need to generate the plan twice so I think it would be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 I tried several options.

  1. create InMemoryFileIndex with filtered file list - this isn't compatible with INMEMORYFILEINDEX_INDEX_ONLY tag and it incurs additional overhead from the newly created InMemoryFileIndex. I'd like to keep the current tag behavior.
  2. IndexHadoopFsRelation returns filtered file list - we need to rewrite InMemoryFileIndex as most of spark code refers relation.location.xxx to get the file list.

A possible better(?) option is introducing a custom FileSourceScanExec or InMemoryFileIndex.. but it will cause some complexity in codebase.

(btw I found the related issue: https://issues.apache.org/jira/browse/SPARK-32986)

So.. how about setting bucketSpec for FilterIndexRule if bucket pruning is applicable for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, maybe I wasn't clear. Is there a scenario where bucket read performs better when there are no buckets pruned?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 if the config "bucketCheck.enabled" is enabled, bucket scan won't be performed if there's no pruned bucket.

#329 will be used as a static config, for experimental purposes?

bucket read might be faster when job scheduling does matter(e.g. cache) or as non-bucketed scan will depend on the default spark configs - https://github.com/apache/spark/blob/2145f07e22a53e2da2f6a424c6fd169c2bb63a48/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala#L86
it might not work well with every dataset/landscape/env setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucket read might be faster when job scheduling does matter(e.g. cache)

Hmm. usually scheduling is very fast that it shouldn't be a bottleneck; how much (percentage) was the scheduling overhead in the cache scenario?

OK, let me finish #329 and let me think if we can remove the inter-dependent configs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. create InMemoryFileIndex with filtered file list - this isn't compatible with INMEMORYFILEINDEX_INDEX_ONLY tag and it incurs additional overhead from the newly created InMemoryFileIndex. I'd like to keep the current tag behavior.

BTW, this seems like a better option as Spark 3.2 will also decouple bucket scan vs. bucket filter: apache/spark@76baaf7

This will require some changes in RuleUtils to support INMEMORYFILEINDEX_INDEX_ONLY but seems doable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes plan + INMEMORYFILEINDEX_INDEX_ONLY will cache it properly, but different file lists might incur unnecessary overhead. Filtering files in DataSourceScanExec is clear and doable like the link.

@rapoth rapoth added the advanced issue This is the tag for advanced issues which involve major design changes or introduction label Jan 27, 2021
@sezruby sezruby changed the title Support adaptive bucketed scan for FilterIndexRule Filter unnecessary index data files if bucket pruning is applicable Feb 4, 2021
@sezruby sezruby marked this pull request as draft February 5, 2021 00:36
@sezruby sezruby changed the title Filter unnecessary index data files if bucket pruning is applicable Support adaptive bucketed scan for FilterIndexRule Feb 5, 2021
@sezruby sezruby marked this pull request as ready for review February 6, 2021 01:29
@clee704 clee704 added the stale There was no activity for this issue/PR for a while label Jun 15, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
advanced issue This is the tag for advanced issues which involve major design changes or introduction stale There was no activity for this issue/PR for a while
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants