Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Index nested fields #365

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, CreateActionEvent, HyperspaceEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{ResolverUtils, SchemaUtils}

class CreateAction(
spark: SparkSession,
Expand Down Expand Up @@ -65,9 +65,15 @@ class CreateAction(
}

private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Flatten the schema to support nested fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Flatten the schema to support nested fields
// Flatten the schema to support nested fields.

val fields = SchemaUtils.escapeFieldNames(SchemaUtils.flatten(schema))
// Resolve index config columns from available column names present in the schema.
ResolverUtils
.resolve(spark, config.indexedColumns ++ config.includedColumns, schema.fieldNames)
.resolve(
spark,
SchemaUtils.escapeFieldNames(config.indexedColumns)
++ SchemaUtils.escapeFieldNames(config.includedColumns),
fields)
.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils}

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -73,7 +75,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))))

val coveringIndexProperties =
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)).toMap
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation) ++
usesNestedFieldsProperty(indexConfig)).toMap

IndexLogEntry(
indexConfig.indexName,
Expand Down Expand Up @@ -109,6 +112,14 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

private def usesNestedFieldsProperty(indexConfig: IndexConfig): Option[(String, String)] = {
if (SchemaUtils.hasNestedFields(indexConfig.indexedColumns ++ indexConfig.includedColumns)) {
Some(IndexConstants.USES_NESTED_FIELDS_PROPERTY -> "true")
} else {
None
}
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

Expand All @@ -117,7 +128,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

// run job
val repartitionedIndexDataFrame =
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*)
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"$c")): _*)

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
Expand All @@ -144,9 +155,9 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
df: DataFrame,
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
val spark = df.sparkSession
val dfColumnNames = df.schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val dfColumnNames = SchemaUtils.flatten(df.schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comment here? for flatten operation

val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comment why "unescapeFieldNames" required here? e.g. nested column names are stored as escaped in index log entry.

val includedColumns = SchemaUtils.unescapeFieldNames(indexConfig.includedColumns)
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)

Expand Down Expand Up @@ -177,8 +188,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// 2. If source data is partitioned, all partitioning key(s) are added to index schema
// as columns if they are not already part of the schema.
val partitionColumns = relation.partitionSchema.map(_.name)
val missingPartitionColumns = partitionColumns.filter(
ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val missingPartitionColumns =
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
Expand All @@ -202,10 +213,16 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
.select(
allIndexColumns.head,
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
.toDF(
SchemaUtils.escapeFieldNames(allIndexColumns) :+ IndexConstants.DATA_FILE_NAME_ID: _*)
} else {
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
.toDF(SchemaUtils.escapeFieldNames(columnsFromIndexConfig): _*)
}

(indexDF, resolvedIndexedColumns, resolvedIncludedColumns)
val escapedIndexedColumns = SchemaUtils.escapeFieldNames(resolvedIndexedColumns)
val escapedIncludedColumns = SchemaUtils.escapeFieldNames(resolvedIncludedColumns)

(indexDF, escapedIndexedColumns, escapedIncludedColumns)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ object IndexConstants {
// To provide multiple paths in the globbing pattern, separate them with commas, e.g.
// "/temp/1/*, /temp/2/*"
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern"

// Indicate whether the index has been built over a nested field.
private[hyperspace] val USES_NESTED_FIELDS_PROPERTY = "hasNestedFields"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this up around 104 lines?

}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ case class IndexLogEntry(
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}

def usesNestedFields: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.USES_NESTED_FIELDS_PROPERTY, "false").toBoolean
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,23 @@ private[hyperspace] case class BucketUnionExec(children: Seq[SparkPlan], bucketS
override def output: Seq[Attribute] = children.head.output

override def outputPartitioning: Partitioning = {
assert(children.map(_.outputPartitioning).toSet.size == 1)
assert(children.head.outputPartitioning.isInstanceOf[HashPartitioning])
assert(
children.head.outputPartitioning
.asInstanceOf[HashPartitioning]
.numPartitions == bucketSpec.numBuckets)
children.head.outputPartitioning
val parts = children.map(_.outputPartitioning).distinct
assert(parts.forall(_.isInstanceOf[HashPartitioning]))
assert(parts.forall(_.numPartitions == bucketSpec.numBuckets))

val reduced = parts.reduceLeft { (a, b) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments about reduced with some example?

val h1 = a.asInstanceOf[HashPartitioning]
val h2 = b.asInstanceOf[HashPartitioning]
val h1Name = h1.references.head.name
val h2Name = h2.references.head.name
val same = h1Name.contains(h2Name) || h2Name.contains(h1Name)
assert(same)
if (h1Name.length > h2Name.length) {
h1
} else {
h2
}
}
reduced
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package com.microsoft.hyperspace.index.rules

import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, GetStructField}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils, SchemaUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -113,8 +116,8 @@ object FilterIndexRule

val candidateIndexes = allIndexes.filter { index =>
indexCoversPlan(
outputColumns,
filterColumns,
SchemaUtils.escapeFieldNames(outputColumns),
SchemaUtils.escapeFieldNames(filterColumns),
index.indexedColumns,
index.includedColumns)
}
Expand Down Expand Up @@ -168,9 +171,19 @@ object ExtractFilterNode {
val projectColumnNames = CleanupAliases(project)
.asInstanceOf[Project]
.projectList
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
.map(PlanUtils.extractNamesFromExpression)
.flatMap(_.toSeq)
val filterColumnNames = condition.references.map(_.name).toSeq
val filterColumnNames = PlanUtils
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments here? like - if there are both a nested column and its parent(?) column in filter condition, only output the nested column name. (+ reason would be good)

.extractNamesFromExpression(condition)
.toSeq
.sortBy(-_.length)
.foldLeft(Seq.empty[String]) { (acc, e) =>
if (!acc.exists(i => i.startsWith(e))) {
acc :+ e
} else {
acc
}
}

Some(project, filter, projectColumnNames, filterColumnNames)

Expand Down
Loading