Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[PROPOSAL]: Index nested fields #347

Open
andrei-ionescu opened this issue Jan 31, 2021 · 11 comments · Fixed by #379 · May be fixed by #365, #380 or #381
Open

[PROPOSAL]: Index nested fields #347

andrei-ionescu opened this issue Jan 31, 2021 · 11 comments · Fixed by #379 · May be fixed by #365, #380 or #381
Labels
proposal This is the default tag for a newly created design proposal untriaged This is the default tag for a newly created issue

Comments

@andrei-ionescu
Copy link
Contributor

andrei-ionescu commented Jan 31, 2021

Problem description

This design proposal is for adding support for nested fields in both indexedColumns and includedColumns.

Currently, Hyperspace does not support nested fields like structs or structured arrays.

Having the possibility to use nested fields with Hyperspace would be of great benefit to everybody.

Goal

The user should be able to create and index with nested fields and use it in queries.

For example:

hs.createIndex(
  df, 
  IndexConfig(
    "idx_nested", 
    indexedColumns = Seq("nested.nst.field1"), 
    includedColumns = Seq("id", "name", "nested.nst.field2")
  )
)

Proposed solution

Using the same Hyperspace APIs indexes over nested fields should work as for other simple fields.

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

This is the way and index on field1 field with other 2 projection fields can be created

hs.createIndex(
  nestedDataset, 
  IndexConfig(
    "idx_nested", 
    indexedColumns = Seq("nested.nst.field1"), 
    includedColumns = Seq("id", "name", "nested.nst.field2")))

Adding the support for nested fields impacts the following areas:

  • Validate nested column names
  • Modify the create index action
  • Modify the filter and rule index functions

Creating the index

Given the dataset defined above with the listed data, after doing

hs.createIndex(
  df, 
  IndexConfig(
    "idx_nested", 
    indexedColumns = Seq("nested.nst.field1"), 
    includedColumns = Seq("id", "name", "nested.nst.field2")))

the following dataset will be created

root
 |-- nested__nst__field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested__nst__field2: string (nullable = true)
+-------------------+---+-----+-------------------+
|nested__nst__field1| id| name|nested__nst__field2|
+-------------------+---+-----+-------------------+
|                wa1|  1|name1|                wb1|
|                wa2|  2|name2|                wb2|
+-------------------+---+-----+-------------------+

It is important to understand that the name of the field of the index column is a non-nested column and due to parquet quirkiness on using . (dot) in the field name, it has to be properly renamed and at query time projected as it was.

Search query

Given the following search/filter query

df.filter(df("nested.nst.field1") === "wa1").select("id", "name", "nested.nst.field2")

The optimized and spark plans without index are

Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- Relation[id#100,name#101,nested#102] parquet
Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- FileScan parquet [id#100,name#101,nested#102] Batched: false, Format: Parquet, 
        Location: InMemoryFileIndex[file:/..../tableN2], PartitionFilters: [], 
        PushedFilters: [IsNotNull(nested)], ReadSchema:
        struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

The transformed optimized and spark plans should look like

Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- Relation[nested__nst__field1#0,id#1,name#2,nested__nst__field2#3]
        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- FileScan parquet [nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] Batched: false, 
        Format: Parquet, Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0],
        PartitionFilters: [], PushedFilters: [IsNotNull(nested__nst__field1)], ReadSchema: 
        struct<nested__nst__field1:string,id:int,name:string,nested__nst__field2:string>

Complexities

Transforming the plan

Filters inside the plan must be modified to accomodate the index schema not the data schema - the flattened schema not the nested field. Instead of accessing the field with GetStructField(GetStructField(AttributeReference)) it must directly access with AttributeReference.

Given the query plan

Project [id#100, name#101, nested#102.nst.field2]
+- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
   +- Relation[id#100,name#101,nested#102] parquet

The filter must be modified from

Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))

to

Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))

The projection from

Project [id#100, name#101, nested#102.nst.field2]

to

Project [id#1, name#2, nested__nst__field2#3]

The relation from

Relation[id#100,name#101,nested#102] parquet

to

Relation[nested__nst__field1#0, id#1,name#2, nested__nst__field2#3] Hyperspace(Type: CI, 
  Name: idx_nested, LogVersion: 1)

Hybrid scans

The transforment plans for Hybrid scans will have need to accomodate the union between the latest files arriving in the dataset having the following schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the index schema

root
 |-- nested__nst__field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested__nst__field2: string (nullable = true)

Search queries

Files Appended

The optimized plan

Union
:- Project [id#1, name#2, nested__nst__field2#3]
:  +- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
:     +- Relation[nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] 
:        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- Relation[id#1,name#2,nested#102] parquet

The Spark plan

Union
:- Project [id#1, name#2, nested__nst__field2#3]
:  +- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
:     +- FileScan parquet [nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] Batched: false, 
:          Format: Parquet, Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
:          PartitionFilters: [], PushedFilters: [IsNotNull(nested__nst__field1)], ReadSchema: 
:          struct<nested__nst__field1:string,id:int,name:string,nested__nst__field2:string>
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- FileScan parquet [id#100,name#101,nested#102] Batched: false, Format: Parquet, 
           Location: InMemoryFileIndex[file:/..../tableN2/appended_file.parquet], 
           PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: 
           struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

Files Deleted

The optimized plan

Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- Project [nested__nst__field1#0, id#1, name#2, nested__nst__field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- Relation[nested__nst__field1#0,id#1,name#2,nested__nst__field2#3,_data_file_id#368L] 
              Hyperspace(Type: CI, Name: indexWithLineage, LogVersion: 1)

The Spark plan

Project [id#1, name#2, nested__nst__field2#3]
+- Filter (isnotnull(nested__nst__field1#0) && (nested__nst__field1#0 = wa1))
   +- Project [nested__nst__field1#0, id#1, name#2, nested__nst__field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- FileScan parquet [nested__nst__field1#0,id#1,name#2,nested__nst__field2#3] Batched: false, 
              Format: Parquet, Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
              PartitionFilters: [], PushedFilters: [IsNotNull(nested__nst__field1)], ReadSchema: 
              struct<nested__nst__field1:string,id:int,name:string,nested__nst__field2:string>

Join queries

The following join queries will have a dataset a bit different from the one at the beginning. The following are extracted from the HybridScanForNestedFieldsTest tests.

root
 |-- Date: string (nullable = true)
 |-- RGUID: string (nullable = true)
 |-- Query: string (nullable = true)
 |-- imprs: integer (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- leaf: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- cnt: integer (nullable = true)

Join append only

Original plan

Project [cnt#556, query#533, id#557, Date#543, id#563]
+- Join Inner, (cnt#556 = cnt#562)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533, nested#536.leaf.id AS id#557]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#562, Date#543, nested#548.leaf.id AS id#563]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#653, query#533, id#654, Date#543, id#660]
+- Join Inner, (cnt#653 = cnt#659)
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#653, query#1 AS query#533, nested__leaf__id#2 AS id#654]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2] 
   :  :          Hyperspace(Type: CI, Name: index_Append, LogVersion: 1)
   :  +- RepartitionByExpression [cnt#653], 200
   :     +- Project [nested#536.leaf.cnt AS cnt#653, query#533, nested#536.leaf.id AS id#654]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- Relation[Query#533,nested#536] parquet
   +- BucketUnion 200 buckets, bucket columns: [cnt]
      :- Project [nested__leaf__cnt#0 AS cnt#659, Date#1 AS Date#543, nested__leaf__id#2 AS id#660]
      :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
      :     :      (nested__leaf__cnt#0 >= 20))
      :     +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2] 
      :          Hyperspace(Type: CI, Name: indexType2_Append, LogVersion: 1)
      +- RepartitionByExpression [cnt#659], 200
         +- Project [nested#548.leaf.cnt AS cnt#659, Date#543, nested#548.leaf.id AS id#660]
            +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
               :      (nested#548.leaf.cnt >= 20))
               +- Relation[Date#543,nested#548] parquet

Altered Spark plan

Project [cnt#653, query#533, id#654, Date#543, id#660]
+- SortMergeJoin [cnt#653], [cnt#659], Inner
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#653, query#1 AS query#533, nested__leaf__id#2 AS id#654]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- FileScan Hyperspace(Type: CI, Name: index_Append, LogVersion: 1) 
   :  :          [nested__leaf__cnt#0,Query#1,nested__leaf__id#2] 
   :  :          Batched: true, Format: Parquet, 
   :  :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...],
   :  :          PartitionFilters: [], PushedFilters: [IsNotNull(nested__leaf__cnt), 
   :  :          GreaterThanOrEqual(nested__leaf__cnt,20), LessThanOrEqual(nested__leaf__cnt,40), 
   :  :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,nested__leaf__id:string>, 
   :  :          SelectedBucketsCount: 200 out of 200
   :  +- Exchange hashpartitioning(cnt#653, 200)
   :     +- Project [nested#536.leaf.cnt AS cnt#653, query#533, nested#536.leaf.id AS id#654]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- FileScan parquet [Query#533,nested#536] Batched: false, Format: Parquet, 
   :                Location: InMemoryFileIndex[file:/.../..., PartitionFilters: [], 
   :                PushedFilters: [IsNotNull(nested)], 
   :                ReadSchema: struct<Query:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>
   +- BucketUnion 200 buckets, bucket columns: [cnt]
      :- Project [nested__leaf__cnt#0 AS cnt#659, Date#1 AS Date#543, nested__leaf__id#2 AS id#660]
      :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
      :     :      (nested__leaf__cnt#0 >= 20))
      :     +- FileScan Hyperspace(Type: CI, Name: indexType2_Append, LogVersion: 1) 
      :          [nested__leaf__cnt#0,Date#1,nested__leaf__id#2]
      :          Batched: true, Format: Parquet, 
      :          Location: InMemoryFileIndex[file://.../spark_warehouse/indexes/...], 
      :          PartitionFilters: [], PushedFilters: [IsNotNull(nested__leaf__cnt), 
      :          LessThanOrEqual(nested__leaf__cnt,40), GreaterThanOrEqual(nested__leaf__cnt,20), 
      :          ReadSchema: struct<nested__leaf__cnt:int,Date:string,nested__leaf__id:string>, 
      :          SelectedBucketsCount: 200 out of 200
      +- Exchange hashpartitioning(cnt#659, 200)
         +- Project [nested#548.leaf.cnt AS cnt#659, Date#543, nested#548.leaf.id AS id#660]
            +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
               :      (nested#548.leaf.cnt >= 20))
               +- FileScan parquet [Date#543,nested#548] Batched: false, Format: Parquet, 
                    Location: InMemoryFileIndex[file:/.../..., PartitionFilters: [], 
                    PushedFilters: [IsNotNull(nested)], 
                    ReadSchema: struct<Date:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>

Delete files

Original plan

Project [cnt#556, query#533, Date#543]
+- Join Inner, (cnt#556 = cnt#560)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :     :      (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && 
         :      (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#605, query#533, Date#543]
+- Join Inner, (cnt#605 = cnt#609)
   :- Project [nested__leaf__cnt#0 AS cnt#605, query#1 AS query#533]
   :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :     :      (nested__leaf__cnt#0 <= 40))
   :     +- Project [nested__leaf__cnt#0, Query#1, nested__leaf__id#2]
   :        +- Filter NOT _data_file_id#615L IN (2,3)
   :           +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2,_data_file_id#615L] 
   :                Hyperspace(Type: CI, Name: index_Delete, LogVersion: 1)
   +- Project [nested__leaf__cnt#0 AS cnt#609, Date#1 AS Date#543]
      +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
         :      (nested__leaf__cnt#0 >= 20))
         +- Project [nested__leaf__cnt#0, Date#1, nested__leaf__id#2]
            +- Filter NOT _data_file_id#616L IN (2,3)
               +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2,_data_file_id#616L] 
                    Hyperspace(Type: CI, Name: indexType2_Delete2, LogVersion: 1)

Altered Spark plan

Project [cnt#605, query#533, Date#543]
+- SortMergeJoin [cnt#605], [cnt#609], Inner
   :- Project [nested__leaf__cnt#0 AS cnt#605, query#1 AS query#533]
   :  +- Filter (((NOT _data_file_id#615L IN (2,3) && isnotnull(nested__leaf__cnt#0)) && 
   :     :      (nested__leaf__cnt#0 >= 20)) && (nested__leaf__cnt#0 <= 40))
   :     +- FileScan Hyperspace(Type: CI, Name: index_Delete, LogVersion: 1) 
   :          [nested__leaf__cnt#0,Query#1,_data_file_id#615L] 
   :          Batched: true, Format: Parquet, 
   :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
   :          PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
   :          IsNotNull(nested__leaf__cnt), GreaterThanOrEqual(nested__leaf__cnt,20),  
   :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,_data_file_id:bigint>, 
   :          SelectedBucketsCount: 200 out of 200
   +- Project [nested__leaf__cnt#0 AS cnt#609, Date#1 AS Date#543]
      +- Filter (((NOT _data_file_id#616L IN (2,3) && isnotnull(nested__leaf__cnt#0)) && 
         :      (nested__leaf__cnt#0 <= 40)) && (nested__leaf__cnt#0 >= 20))
         +- FileScan Hyperspace(Type: CI, Name: indexType2_Delete2, LogVersion: 1) 
              [nested__leaf__cnt#0,Date#1,_data_file_id#616L] 
              Batched: true, Format: Parquet, 
              Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
              PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
              IsNotNull(nested__leaf__cnt), LessThanOrEqual(nested__leaf__cnt,40), 
              ReadSchema: struct<nested__leaf__cnt:int,Date:string,_data_file_id:bigint>, 
              SelectedBucketsCount: 200 out of 200

Append + Delete

Original plan

Project [cnt#556, query#533, Date#543]
+- Join Inner, (cnt#556 = cnt#560)
   :- Project [nested#536.leaf.cnt AS cnt#556, query#533]
   :  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
   :     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
   +- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
      +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
         +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

Altered optimized plan

Project [cnt#617, query#533, Date#543]
+- Join Inner, (cnt#617 = cnt#621)
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#617, query#1 AS query#533]
   :  :  +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 >= 20)) && 
   :  :     :      (nested__leaf__cnt#0 <= 40))
   :  :     +- Project [nested__leaf__cnt#0, Query#1, nested__leaf__id#2]
   :  :        +- Filter NOT (_data_file_id#627L = 3)
   :  :           +- Relation[nested__leaf__cnt#0,Query#1,nested__leaf__id#2,_data_file_id#627L] 
   :  :                Hyperspace(Type: CI, Name: index_Both, LogVersion: 1)
   :  +- RepartitionByExpression [cnt#617], 200
   :     +- Project [nested#536.leaf.cnt AS cnt#617, query#533]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- Relation[Query#533,nested#536] parquet
   +- Project [nested__leaf__cnt#0 AS cnt#621, Date#1 AS Date#543]
      +- Filter ((isnotnull(nested__leaf__cnt#0) && (nested__leaf__cnt#0 <= 40)) && 
         :      (nested__leaf__cnt#0 >= 20))
         +- Project [nested__leaf__cnt#0, Date#1, nested__leaf__id#2]
            +- Filter NOT _data_file_id#628L INSET (2,3)
               +- Relation[nested__leaf__cnt#0,Date#1,nested__leaf__id#2,_data_file_id#628L] 
                    Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1)

Altere Spark plan

Project [cnt#617, query#533, Date#543]
+- SortMergeJoin [cnt#617], [cnt#621], Inner
   :- BucketUnion 200 buckets, bucket columns: [cnt]
   :  :- Project [nested__leaf__cnt#0 AS cnt#617, query#1 AS query#533]
   :  :  +- Filter (((NOT (_data_file_id#627L = 3) && isnotnull(nested__leaf__cnt#0)) &&
   :  :     :      (nested__leaf__cnt#0 >= 20)) && (nested__leaf__cnt#0 <= 40))
   :  :     +- FileScan Hyperspace(Type: CI, Name: index_Both, LogVersion: 1) 
   :  :          [nested__leaf__cnt#0,Query#1,_data_file_id#627L] 
   :  :          Batched: true, Format: Parquet, 
   :  :          Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/...], 
   :  :          PartitionFilters: [], PushedFilters: [Not(EqualTo(_data_file_id,3)), 
   :  :          IsNotNull(nested__leaf__cnt), GreaterThanOrEqual(nested__leaf__cnt,20), 
   :  :          ReadSchema: struct<nested__leaf__cnt:int,Query:string,_data_file_id:bigint>, 
   :  :          SelectedBucketsCount: 200 out of 200
   :  +- Exchange hashpartitioning(cnt#617, 200)
   :     +- Project [nested#536.leaf.cnt AS cnt#617, query#533]
   :        +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && 
   :           :      (nested#536.leaf.cnt <= 40))
   :           +- FileScan parquet [Query#533,nested#536] Batched: false, Format: Parquet, 
   :                Location: InMemoryFileIndex[file:/.../...], 
   :                PartitionFilters: [], PushedFilters: [IsNotNull(nested)], 
   :                ReadSchema: struct<Query:string,nested:struct<id:string,leaf:struct<id:string,cnt:int>>>
   +- Project [nested__leaf__cnt#0 AS cnt#621, Date#1 AS Date#543]
      +- Filter (((NOT _data_file_id#628L INSET (2,3) && isnotnull(nested__leaf__cnt#0)) && 
         :      (nested__leaf__cnt#0 <= 40)) && (nested__leaf__cnt#0 >= 20))
         +- FileScan Hyperspace(Type: CI, Name: indexType2_Delete, LogVersion: 1) 
              [nested__leaf__cnt#0,Date#1,_data_file_id#628L] 
              Batched: true, Format: Parquet, 
              Location: InMemoryFileIndex[file:/.../spark_warehouse/indexes/..., 
              PartitionFilters: [], PushedFilters: [Not(In(_data_file_id, [2,3])), 
              IsNotNull(nested__leaf__cnt), LessThanOrEqual(nested__leaf__cnt,40),
              ReadSchema: struct<nested__leaf__cnt:int,Date:string,_data_file_id:bigint>, 
              SelectedBucketsCount: 200 out of 200

Impact

There should be no impact on the current Hyperspace implementations.

Background

This is a result of #312 discussion.

@andrei-ionescu andrei-ionescu added proposal This is the default tag for a newly created design proposal untriaged This is the default tag for a newly created issue labels Jan 31, 2021
@sezruby
Copy link
Collaborator

sezruby commented Feb 1, 2021

Looks good! Could you also consider
df.filter(df("nested.nst.field1") === "wa1").select("id", "name", "nested.nst.fieldId1") ?
We might need to replace all "nested#102.nst.field1" in upper level of the tree plan, not just in (Project => Filter => Relation).
It would be good if you could check replacing attributes for all plan nodes will work with some testing code.

@imback82
Copy link
Contributor

imback82 commented Feb 1, 2021

+1 as well. Could you capture that nested fields can also be supported for included columns?

@rapoth
Copy link
Contributor

rapoth commented Feb 1, 2021

+1 Looks awesome! Thanks @andrei-ionescu!

I have a few minor questions/nits:

  1. In your Goal section:
     hs.createIndex(
       df, 
       IndexConfig(
         "idx_nested", 
         indexedColumns = Seq("nested.nst.field1"), 
         includedColumns = Seq("id", "name", "nested.nst.field1")
       )
     )
    
    Did you mean nested.nst.field2 in includedColumns?
  2. In Creating the index:
    hs.createIndex(df, IndexConfig("idx_nested", indexedColumns = Seq("nested.nst.field1"), includedColumns = Seq("id", "name")))
    
    should have nested.nst.field2 in the included columns (and the appropriate name change reflected in the example righ below).
  3. For the following Join queries with hybrid scans do get even more complex., could you add more details (similar to how you did for Hybrid scan) for completeness?

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Feb 2, 2021

At first, it was intentional to project the field that we search by (the indexed field). But I switch to your suggestion as it seems more used.

I'll add details for joins too, as I get more understanding how the plans do get composed.

@rapoth
Copy link
Contributor

rapoth commented Feb 2, 2021

Sounds good, thank you!

@andrei-ionescu
Copy link
Contributor Author

@rapoth, @imback82, @sezruby: I added a few more plans for hybrid scans - one for the append files and one for deleted files. Let me know if there is something off with them.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Feb 24, 2021

@rapoth, @imback82, @sezruby: There is an improvements that I need to address. The fromDifferentBaseRelations method inside the ensureAttributeRequirements needs to be modified.

Given the nested dataset

root
 |-- Date: string (nullable = true)
 |-- RGUID: string (nullable = true)
 |-- Query: string (nullable = true)
 |-- imprs: integer (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- leaf: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- cnt: integer (nullable = true)

and this join

val query2 = df1.filter(df1("nested.leaf.cnt") >= 20).select(df1("nested.leaf.cnt"), df1("query"))
val query = df2.filter(df2("nested.leaf.cnt") <= 40).select(df2("nested.leaf.cnt"), df2("Date"))
query2.join(query, "cnt")

resulting in the following join plan over nested fields

Join Inner, (cnt#556 = cnt#560)
:- Project [nested#536.leaf.cnt AS cnt#556, query#533]
:  +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
:     +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
+- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
   +- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
      +- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet

the fromDifferentBaseRelations method will return false thus the join will not meet the criteria to qualify for plan rewrite. This is because of the fact that the fields do have different ids (nested#536.leaf.cnt AS cnt#556 vs nested#548.leaf.cnt AS cnt#560) and the match is not executed on names but on ids.

Canonicalised fields:

  • Condition left: none#556
  • Condition right: none#560
  • Plan output left: List(none#531, none#532, none#533, none#534, none#535, none#536)
  • Plan output right: List(none#543, none#544, none#545, none#546, none#547, none#548)

Do you see any problems if I modify the fromDifferentBaseRelations to check fields by names instead of ids taking into account the case sensitivity setting?

@andrei-ionescu andrei-ionescu linked a pull request Feb 24, 2021 that will close this issue
@sezruby
Copy link
Collaborator

sezruby commented Feb 26, 2021

@andrei-ionescu I think the problem is that canonicalized value of condition left is none#556, not none#536
Could you try to fix canonicalization of the condition expr properly (if possible), instead of using name?

@andrei-ionescu
Copy link
Contributor Author

@sezruby I don't know if that is possible without other changes. The ensureAttributeRequirements(l: FileBasedRelation, r: FileBasedRelation, condition: Expression): Boolean works with FileBaseRelation and the join condition Expression. If you look into the plan above, the condition uses the aliases (cnt#556 = cnt#560) that come from projection (ie: nested#536.leaf.cnt AS cnt#556) but the FileBasedRelation does not have any idea how to connect #556 to #536 because the projection step/layer is not taken into account.

I just want to mention that the plan listed above is what Spark does by itself - Spark is adding those aliases in this case of joins.

My approach was to not inject the projection into the ensureAttributeRequirements but to check requirements based on the field names.

Your suggestion is to keep the canonicalisation but to do that we need to go through projection layers to build the lineage from the join condition to relation fields. I'll try adding a function transforming the canonicalized condition fields into relation fields and use it inside fromDifferentBaseRelations method.

The next step will be modifying the getBestIndexPair which makes use of field names.

@sezruby
Copy link
Collaborator

sezruby commented Feb 26, 2021

Based on the field name might cause some confusion/problem if there are same column names on both left and right.
So I think we should keep trying use the id.

@andrei-ionescu
Copy link
Contributor Author

andrei-ionescu commented Mar 2, 2021

@rapoth, @imback82, @sezruby

  1. I updated the proposal above and added plans for hybrid scans, joins and most of the use cases.
  2. The implementation for nested fields support is done in Support index creation on nested fields #379, Support filter over indexes on nested fields #380, Support joins over nested fields with indexes on nested fields #381.

Kindly, please have a look into it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
proposal This is the default tag for a newly created design proposal untriaged This is the default tag for a newly created issue
Projects
None yet
4 participants