Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Index nested fields #365

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

andrei-ionescu
Copy link
Contributor

@andrei-ionescu andrei-ionescu commented Feb 24, 2021

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for indexing over nested fields (ie: structs).

The first commit is adding support for building the index over a nested (struct) field and support for modifying the search query to properly use that index. It has suport for hybrid scans for both append and delete files in the hybrid scan context.

The second commit will address the join use case.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing unit and integration tests for not breaking the existing functionalities.
Unit and integration test added for the new functionalities.

Search queries

The following search queries use a dataset with the following schema:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

Files Appended

The optimized plan

Union
:- Project [id#1, name#2, nested__nst__field2#3]
:  +- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
:     +- Relation[nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] 
:        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- Relation[id#1,name#2,nested#102] parquet

The Spark plan

Union
:- Project [id#1, name#2, nested__nst__field2#3]
:  +- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
:     +- FileScan parquet [nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] Batched: false, 
:          Format: Parquet, Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
:          PartitionFilters: [], PushedFilters: [IsNotNull(nested__nst__field1)], ReadSchema: 
:          struct<nested__nst__field1:string,id:int,name:string,nested__nst__field2:string>
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- FileScan parquet [id#100,name#101,nested#102] Batched: false, Format: Parquet, 
           Location: InMemoryFileIndex[file:/..../tableN2/appended_file.parquet], 
           PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: 
           struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

Files Deleted

The optimized plan

Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- Project [nested__nst__field1#0, id#1, name#2, nested__nst__field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- Relation[nested__nst__field1#0,id#1,name#2,nested__nst__field2#3,_data_file_id#368L] 
              Hyperspace(Type: CI, Name: indexWithLineage, LogVersion: 1)

The Spark plan

Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- Project [nested__nst__field1#0, id#1, name#2, nested__nst__field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- FileScan parquet [nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] Batched: false, 
              Format: Parquet, Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
              PartitionFilters: [], PushedFilters: [IsNotNull(nested__nst__field1)], ReadSchema: 
              struct<nested__nst__field1:string,id:int,name:string,nested__nst__field2:string>

Join queries

The following join queries will have a dataset a bit different from the one at the beginning. The following are extracted from the HybridScanForNestedFieldsTest tests.

root
 |-- Date: string (nullable = true)
 |-- RGUID: string (nullable = true)
 |-- Query: string (nullable = true)
 |-- imprs: integer (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- leaf: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- cnt: integer (nullable = true)

Join append only

Original plan

Project [cnt#556, query#533, id#557, Date#543, id#563]
+- Join Inner, (cnt#556 = cnt#562)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533, nested#536.leaf.id AS id#557]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#562, Date#543, nested#548.leaf.id AS id#563]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#653, query#533, id#654, Date#543, id#660]
+- Join Inner, (cnt#653 = cnt#659)
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#653, query#1 AS query#533, nested__leaf__id#2 AS id#654]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2] 
   :  :          Hyperspace(Type: CI, Name: index_Append, LogVersion: 1)
   :  +- RepartitionByExpression [cnt#653], 200
   :     +- Project [nested#536.leaf.cnt AS cnt#653, query#533, nested#536.leaf.id AS id#654]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- Relation[Query#533,nested#536] parquet
   +- BucketUnion 200 buckets, bucket columns: [cnt]
      :- Project [nested__leaf__cnt#0 AS cnt#659, Date#1 AS Date#543, nested__leaf__id#2 AS id#660]
      :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
      :     :      (nested__leaf__cnt#0 >= 20))
      :     +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2] 
      :          Hyperspace(Type: CI, Name: indexType2_Append, LogVersion: 1)
      +- RepartitionByExpression [cnt#659], 200
         +- Project [nested#548.leaf.cnt AS cnt#659, Date#543, nested#548.leaf.id AS id#660]
            +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
               :      (nested#548.leaf.cnt >= 20))
               +- Relation[Date#543,nested#548] parquet

Altered Spark plan

Project [cnt#653, query#533, id#654, Date#543, id#660]
+- SortMergeJoin [cnt#653], [cnt#659], Inner
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#653, query#1 AS query#533, nested__leaf__id#2 AS id#654]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- FileScan Hyperspace(Type: CI, Name: index_Append, LogVersion: 1) 
   :  :          [nested__leaf__cnt#0,Query#1,nested__leaf__id#2] 
   :  :          Batched: true, Format: Parquet, 
   :  :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...],
   :  :          PartitionFilters: [], PushedFilters: [IsNotNull(nested__leaf__cnt), 
   :  :          GreaterThanOrEqual(nested__leaf__cnt,20), LessThanOrEqual(nested__leaf__cnt,40), 
   :  :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,nested__leaf__id:string>, 
   :  :          SelectedBucketsCount: 200 out of 200
   :  +- Exchange hashpartitioning(cnt#653, 200)
   :     +- Project [nested#536.leaf.cnt AS cnt#653, query#533, nested#536.leaf.id AS id#654]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- FileScan parquet [Query#533,nested#536] Batched: false, Format: Parquet, 
   :                Location: InMemoryFileIndex[file:/.../..., PartitionFilters: [], 
   :                PushedFilters: [IsNotNull(nested)], 
   :                ReadSchema: struct<Query:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>
   +- BucketUnion 200 buckets, bucket columns: [cnt]
      :- Project [nested__leaf__cnt#0 AS cnt#659, Date#1 AS Date#543, nested__leaf__id#2 AS id#660]
      :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
      :     :      (nested__leaf__cnt#0 >= 20))
      :     +- FileScan Hyperspace(Type: CI, Name: indexType2_Append, LogVersion: 1) 
      :          [nested__leaf__cnt#0,Date#1,nested__leaf__id#2]
      :          Batched: true, Format: Parquet, 
      :          Location: InMemoryFileIndex[file://.../spark_warehouse/indexes/...], 
      :          PartitionFilters: [], PushedFilters: [IsNotNull(nested__leaf__cnt), 
      :          LessThanOrEqual(nested__leaf__cnt,40), GreaterThanOrEqual(nested__leaf__cnt,20), 
      :          ReadSchema: struct<nested__leaf__cnt:int,Date:string,nested__leaf__id:string>, 
      :          SelectedBucketsCount: 200 out of 200
      +- Exchange hashpartitioning(cnt#659, 200)
         +- Project [nested#548.leaf.cnt AS cnt#659, Date#543, nested#548.leaf.id AS id#660]
            +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
               :      (nested#548.leaf.cnt >= 20))
               +- FileScan parquet [Date#543,nested#548] Batched: false, Format: Parquet, 
                    Location: InMemoryFileIndex[file:/.../..., PartitionFilters: [], 
                    PushedFilters: [IsNotNull(nested)], 
                    ReadSchema: struct<Date:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>

Delete files

Original plan

Project [cnt#556, query#533, Date#543]
+- Join Inner, (cnt#556 = cnt#560)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :     :      (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
         :      (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#605, query#533, Date#543]
+- Join Inner, (cnt#605 = cnt#609)
   :- Project [nested__leaf__cnt#0 AS cnt#605, query#1 AS query#533]
   :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :     :      (nested__leaf__cnt#0 <= 40))
   :     +- Project [nested__leaf__cnt#0, Query#1, nested__leaf__id#2]
   :        +- Filter NOT _data_file_id#615L IN (2,3)
   :           +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2,_data_file_id#615L] 
   :                Hyperspace(Type: CI, Name: index_Delete, LogVersion: 1)
   +- Project [nested__leaf__cnt#0 AS cnt#609, Date#1 AS Date#543]
      +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
         :      (nested__leaf__cnt#0 >= 20))
         +- Project [nested__leaf__cnt#0, Date#1, nested__leaf__id#2]
            +- Filter NOT _data_file_id#616L IN (2,3)
               +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2,_data_file_id#616L] 
                    Hyperspace(Type: CI, Name: indexType2_Delete2, LogVersion: 1)

Altered Spark plan

Project [cnt#605, query#533, Date#543]
+- SortMergeJoin [cnt#605], [cnt#609], Inner
   :- Project [nested__leaf__cnt#0 AS cnt#605, query#1 AS query#533]
   :  +- Filter (((NOT _data_file_id#615L IN (2,3) && isnotnull(nested__leaf__cnt#0)) && 
   :     :      (nested__leaf__cnt#0 >= 20)) && (nested__leaf__cnt#0 <= 40))
   :     +- FileScan Hyperspace(Type: CI, Name: index_Delete, LogVersion: 1) 
   :          [nested__leaf__cnt#0,Query#1,_data_file_id#615L] 
   :          Batched: true, Format: Parquet, 
   :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
   :          PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
   :          IsNotNull(nested__leaf__cnt), GreaterThanOrEqual(nested__leaf__cnt,20),  
   :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,_data_file_id:bigint>, 
   :          SelectedBucketsCount: 200 out of 200
   +- Project [nested__leaf__cnt#0 AS cnt#609, Date#1 AS Date#543]
      +- Filter (((NOT _data_file_id#616L IN (2,3) && isnotnull(nested__leaf__cnt#0)) && 
         :      (nested__leaf__cnt#0 <= 40)) && (nested__leaf__cnt#0 >= 20))
         +- FileScan Hyperspace(Type: CI, Name: indexType2_Delete2, LogVersion: 1) 
              [nested__leaf__cnt#0,Date#1,_data_file_id#616L] 
              Batched: true, Format: Parquet, 
              Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
              PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
              IsNotNull(nested__leaf__cnt), LessThanOrEqual(nested__leaf__cnt,40), 
              ReadSchema: struct<nested__leaf__cnt:int,Date:string,_data_file_id:bigint>, 
              SelectedBucketsCount: 200 out of 200

Append + Delete

Original plan

Project [cnt#556, query#533, Date#543]
+- Join Inner, (cnt#556 = cnt#560)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#617, query#533, Date#543]
+- Join Inner, (cnt#617 = cnt#621)
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#617, query#1 AS query#533]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- Project [nested__leaf__cnt#0, Query#1, nested__leaf__id#2]
   :  :        +- Filter NOT (_data_file_id#627L = 3)
   :  :           +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2,_data_file_id#627L] 
   :  :                Hyperspace(Type: CI, Name: index_Both, LogVersion: 1)
   :  +- RepartitionByExpression [cnt#617], 200
   :     +- Project [nested#536.leaf.cnt AS cnt#617, query#533]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- Relation[Query#533,nested#536] parquet
   +- Project [nested__leaf__cnt#0 AS cnt#621, Date#1 AS Date#543]
      +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
         :      (nested__leaf__cnt#0 >= 20))
         +- Project [nested__leaf__cnt#0, Date#1, nested__leaf__id#2]
            +- Filter NOT _data_file_id#628L INSET (2,3)
               +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2,_data_file_id#628L] 
                    Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1)

Altere Spark plan

Project [cnt#617, query#533, Date#543]
+- SortMergeJoin [cnt#617], [cnt#621], Inner
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#617, query#1 AS query#533]
   :  :  +- Filter (((NOT (_data_file_id#627L = 3) && isnotnull(nested__leaf__cnt#0)) &&
   :  :     :      (nested__leaf__cnt#0 >= 20)) && (nested__leaf__cnt#0 <= 40))
   :  :     +- FileScan Hyperspace(Type: CI, Name: index_Both, LogVersion: 1) 
   :  :          [nested__leaf__cnt#0,Query#1,_data_file_id#627L] 
   :  :          Batched: true, Format: Parquet, 
   :  :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
   :  :          PartitionFilters: [], PushedFilters: [Not(EqualTo(_data_file_id,3)), 
   :  :          IsNotNull(nested__leaf__cnt), GreaterThanOrEqual(nested__leaf__cnt,20), 
   :  :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,_data_file_id:bigint>, 
   :  :          SelectedBucketsCount: 200 out of 200
   :  +- Exchange hashpartitioning(cnt#617, 200)
   :     +- Project [nested#536.leaf.cnt AS cnt#617, query#533]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- FileScan parquet [Query#533,nested#536] Batched: false, Format: Parquet, 
   :                Location: InMemoryFileIndex[file:/.../...], 
   :                PartitionFilters: [], PushedFilters: [IsNotNull(nested)], 
   :                ReadSchema: struct<Query:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>
   +- Project [nested__leaf__cnt#0 AS cnt#621, Date#1 AS Date#543]
      +- Filter (((NOT _data_file_id#628L INSET (2,3) && isnotnull(nested__leaf__cnt#0)) && 
         :      (nested__leaf__cnt#0 <= 40)) && (nested__leaf__cnt#0 >= 20))
         +- FileScan Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1) 
              [nested__leaf__cnt#0,Date#1,_data_file_id#628L] 
              Batched: true, Format: Parquet, 
              Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/..., 
              PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
              IsNotNull(nested__leaf__cnt), LessThanOrEqual(nested__leaf__cnt,40),
              ReadSchema: struct<nested__leaf__cnt:int,Date:string,_data_file_id:bigint>, 
              SelectedBucketsCount: 200 out of 200

@sezruby
Copy link
Collaborator

sezruby commented Mar 1, 2021

BTW thanks for the great work! and sorry for the delay in reviewing.. 🐢🦥 I'll try to do the review asap.

@sezruby
Copy link
Collaborator

sezruby commented Mar 1, 2021

Could you add logical/spark plan change in the PR description?
I'd like to see the plan of join query + filter index for one child.
e.g.

val filter = df.filter(..).select("nested.field") // index applied
val filter2 = df.filter(...) // index is not applied for this filter
val join = filter.join(filter2, ...)

@andrei-ionescu
Copy link
Contributor Author

@sezruby Thanks for reviewing this PR. I added some more code to support nested fields in joins and all hybrid scans.

I'll create start integrating your feedback.

@andrei-ionescu

This comment has been minimized.

@imback82
Copy link
Contributor

imback82 commented Mar 2, 2021

Thanks for reporting this. Build nodes must have changed. @sezruby do you have time to take a look? Otherwise, I will get to this toward the end of this week. We can rely on Scala 2.12 pipeline meanwhile.

@andrei-ionescu
Copy link
Contributor Author

@imback82 @sezruby

First, thanks for fixing the build issue for Scala 2.11.

I integrated the feedback and rebased upon master. Please have a second round of PR review. Thanks!

@sezruby
Copy link
Collaborator

sezruby commented Mar 5, 2021

@andrei-ionescu I know this change all comes together, but could you have split this PR into small PRs by any chance?
Since it's bit hard to get these in at a time with some proper reviews.

You can keep this PR for reference and I suggest following 3 PRs:

  • create & refresh
  • filter & join rule application
  • hybrid scan

BTW thanks for the plan update :) seems it works well as expected 👍

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 5, 2021

@sezruby In regards to review effort, I won't help too much because creation and refresh parts are very small and the other two are equally in code change.

I did tackled the feature like this:

  1. First create & refresh
  2. Filter implementation with hybrid scans
  3. Join implementation with hybrid scans

I don't understand why you prefer separate PRs instead of separate commits - you can review each commit separately. Do you want to merge them separately? If so, wouldn't Hyperspace be in inconsistent state feature wise?

@sezruby
Copy link
Collaborator

sezruby commented Mar 5, 2021

@andrei-ionescu I have the same experience when I was working on Hybrid Scan last year - #123 (one big PR, but rejected 🙅) and I split the PR into several PRs (#150).
And please refer https://github.com/google/eng-practices/blob/master/review/developer/small-cls.md.

I don't understand why you prefer separate PRs instead of separate commits - you can review each commit separately.

Since these commits become 1 commit after merging to master branch.

Do you want to merge them separately? If so, wouldn't Hyperspace be in inconsistent state feature wise?

It's fine as long as they don't break the build & test.

Create & refresh can be small but it requires some utility functions & test setup.
We could also split filter & join into 2 PRs as it includes many lines of tests.

@andrei-ionescu
Copy link
Contributor Author

@sezruby The only advantage for separate PRs is if you specifically intend to merge them separately.

For better reading and understanding of the code, we can still keep a single PR but with more commits.

For me building separate PRs means a lot of extra work as I need to keep al those branches up to date through constant rebases and link them one on top of the other. There are multiple changes in the same file and any rebase will result in as many more times of conflict resolving as the branches I create for these PRs.

I would like not to go through this ordeal.

@imback82, @rapoth WDYT?

BTW: What does "CL" stand for? I've seen it in the developer docs but never explained clearly.

@imback82
Copy link
Contributor

imback82 commented Mar 8, 2021

+1 for smaller PRs if possible. For this PR, I think it would make sense to split as @sezruby suggested above since they seem easily splittable?

The rationale behind the smaller PRs is described in the doc shared above. Especially, when we have a PR that's +2500 lines long, it's really hard for reviewers to review quickly and correctly. (and reviewing by commits doesn't seem possible when each commit is also big)

For me building separate PRs means a lot of extra work as I need to keep al those branches up to date through constant rebases and link them one on top of the other. There are multiple changes in the same file and any rebase will result in as many more times of conflict resolving as the branches I create for these PRs.

Since you have this "reference" PR, you can just create a PR one by one instead of creating all of them at once? In this way, you don't need to keep many branches up to date.

I would like not to go through this ordeal.

Thanks for working on this great feature. I think smaller PRs will help both sides to iterate quicker. But if you think it's not feasible, you can still keep this PR as one, but please bear with us if it takes a very long time to get it reviewed/merged. Thanks!

(I believe CL stands for change list)?

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 8, 2021

@imback82 Thanks for your opinion.

If you can guarantee that it will take less time and a better team focus to get everything merged as small PRs than this big PR, then I can see the its advantage. Otherwise it's just extra work on my side.

BTW, It is nowhere specified in any docs that in Hyperspace, one should be using Google's Small CLS engineering protocols.

Copy link
Collaborator

@sezruby sezruby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did one round except for RuleUtils + tests

@@ -65,9 +65,15 @@ class CreateAction(
}

private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Flatten the schema to support nested fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Flatten the schema to support nested fields
// Flatten the schema to support nested fields.

val dfColumnNames = df.schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val dfColumnNames = SchemaUtils.flatten(df.schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comment here? for flatten operation

val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val dfColumnNames = SchemaUtils.flatten(df.schema)
val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comment why "unescapeFieldNames" required here? e.g. nested column names are stored as escaped in index log entry.

@@ -109,4 +109,7 @@ object IndexConstants {
// To provide multiple paths in the globbing pattern, separate them with commas, e.g.
// "/temp/1/*, /temp/2/*"
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern"

// Indicate whether the index has been built over a nested field.
private[hyperspace] val USES_NESTED_FIELDS_PROPERTY = "hasNestedFields"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this up around 104 lines?

assert(parts.forall(_.isInstanceOf[HashPartitioning]))
assert(parts.forall(_.numPartitions == bucketSpec.numBuckets))

val reduced = parts.reduceLeft { (a, b) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments about reduced with some example?


/**
* Returns true if the given project is a supported project. If all of the registered
* providers return None, this returns false.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revise the comment?


/**
* Returns true if the given filter is a supported filter. If all of the registered
* providers return None, this returns false.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

/**
* Given a nested field this method extracts the full name out of it.
*
* @param field The field from which to get the name from
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit & ditto below functions

Suggested change
* @param field The field from which to get the name from
* @param field The field from which to get the name from.

object SchemaUtils {

val NESTED_FIELD_NEEDLE_REGEX = "\\."
val NESTED_FIELD_REPLACEMENT = "__"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to use double underscores?

* @param index The chosen index
* @return A collection of nested field names
*/
private def getNestedFields(index: IndexLogEntry): Seq[String] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to move these two func to IndexLogEntry. WDYT?

@imback82
Copy link
Contributor

If you can guarantee that it will take less time and a better team focus to get everything merged as small PRs than this big PR, then I can see the its advantage. Otherwise it's just extra work on my side.

Yes, I think it will definitely accelerate the review process. @sezruby, do you also agree after doing reviews on this PR?

BTW, It is nowhere specified in any docs that in Hyperspace, one should be using Google's Small CLS engineering protocols.

We have been referring to the doc internally if there is a conflict in the discussion. We can make it a formal process if needed; note that we are still improving the process in this repo as needed/required.

@sezruby
Copy link
Collaborator

sezruby commented Mar 10, 2021

Yea as most of functionality is validated in this PR, we can do small PRs only for merging them partially. It's too long scrooooll and got distracted in the middle of reviewing :D ..

@andrei-ionescu with this PR, you won't be able to get the review from @imback82 this year lol

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 10, 2021

We have been referring to the doc internally if there is a conflict in the discussion. We can make it a formal process if needed; note that we are still improving the process in this repo as needed/required.

I'm just saying that I would prefer to know it before hand and it would have been very useful to me to have it in the dev process docs. I would have started with it in mind and would ease up may work load. Keeping in mind that bigger features are going to come and that you desire the Hyperspace community to grow, It would be very helpful to have it stated in your docs for further big chunks of code changes.

@imback82, @sezruby: To set this clear so I wouldn't get through this process of "re-creating PRs again and again", look at the the following set of PRs that I want to create for this feature:

  1. Create & Refresh actions
  2. Filter/Search implementation
  3. Hybrid scans for filter/search
  4. Join implementation
  5. Hybrid scans for joins

Q1: Can we agree on the PRs lists above?
Q2: Do I have your agreement that you'll focus on reviewing each of them?

@sezruby
Copy link
Collaborator

sezruby commented Mar 10, 2021

@andrei-ionescu
Q1: Looks good
Q2: Yep, but 1 ~ 2 PRs (at a time) would be good; just don't create all PRs at first.

@imback82
Copy link
Contributor

Keeping in mind that bigger features are going to come and that you desire the Hyperspace community to grow, It would be very helpful to have it stated in your docs for further big chunks of code changes.

Sure, I will update the contribution guide.

Q1: Can we agree on the PRs lists above?

The list looks reasonable to me.

Q2: Do I have your agreement that you'll focus on reviewing each of them?

Sorry about reviewing the PRs late. We were not expecting a big feature from the external contributors, so I acknowledge that we didn't allocate resources correctly. We will try to fix this moving forward. And to answer your concern, yes, once the PRs are in a manageable size, we will review them as soon as we can. I expect you create these PRs one by one, and not all at once?

@andrei-ionescu
Copy link
Contributor Author

@imback82 I will create them as my time allows. If I'll be able to create all of them at once then I'll do it like that. What I can say is that I'll start creating them in the agreed order. I'll ping you guys on each of them as they land.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PROPOSAL]: Index nested fields
3 participants