Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add support for building index on nested fields
Browse files Browse the repository at this point in the history
  • Loading branch information
andrei-ionescu committed Feb 24, 2021
1 parent 66076e0 commit d4cd2e8
Show file tree
Hide file tree
Showing 13 changed files with 1,783 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, CreateActionEvent, HyperspaceEvent}
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.{ResolverUtils, SchemaUtils}

class CreateAction(
spark: SparkSession,
Expand Down Expand Up @@ -65,9 +65,15 @@ class CreateAction(
}

private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Flatten the schema to support nested fields
val fields = SchemaUtils.escapeFieldNames(SchemaUtils.flatten(schema))
// Resolve index config columns from available column names present in the schema.
ResolverUtils
.resolve(spark, config.indexedColumns ++ config.includedColumns, schema.fieldNames)
.resolve(
spark,
SchemaUtils.escapeFieldNames(config.indexedColumns)
++ SchemaUtils.escapeFieldNames(config.includedColumns),
fields)
.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils}

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -73,7 +75,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))))

val coveringIndexProperties =
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)).toMap
(hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation) ++
usesNestedFieldsProperty(indexConfig)).toMap

IndexLogEntry(
indexConfig.indexName,
Expand All @@ -92,23 +95,6 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

private def hasParquetAsSourceFormatProperty(
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
val numBuckets = numBucketsForIndex(spark)

Expand All @@ -117,7 +103,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

// run job
val repartitionedIndexDataFrame =
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*)
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"$c")): _*)

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
Expand All @@ -140,13 +126,38 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
relations.head
}

private def hasParquetAsSourceFormatProperty(
relation: FileBasedRelation): Option[(String, String)] = {
if (relation.hasParquetAsSourceFormat) {
Some(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true")
} else {
None
}
}

private def hasLineageProperty(spark: SparkSession): Option[(String, String)] = {
if (hasLineage(spark)) {
Some(IndexConstants.LINEAGE_PROPERTY -> "true")
} else {
None
}
}

private def usesNestedFieldsProperty(indexConfig: IndexConfig): Option[(String, String)] = {
if (SchemaUtils.hasNestedFields(indexConfig.indexedColumns ++ indexConfig.includedColumns)) {
Some(IndexConstants.USES_NESTED_FIELDS_PROPERTY -> "true")
} else {
None
}
}

private def resolveConfig(
df: DataFrame,
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
val spark = df.sparkSession
val dfColumnNames = df.schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val dfColumnNames = SchemaUtils.flatten(df.schema)
val indexedColumns = SchemaUtils.unescapeFieldNames(indexConfig.indexedColumns)
val includedColumns = SchemaUtils.unescapeFieldNames(indexConfig.includedColumns)
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)

Expand Down Expand Up @@ -177,8 +188,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// 2. If source data is partitioned, all partitioning key(s) are added to index schema
// as columns if they are not already part of the schema.
val partitionColumns = relation.partitionSchema.map(_.name)
val missingPartitionColumns = partitionColumns.filter(
ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val missingPartitionColumns =
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
Expand All @@ -202,10 +213,16 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
.select(
allIndexColumns.head,
allIndexColumns.tail :+ IndexConstants.DATA_FILE_NAME_ID: _*)
.toDF(
SchemaUtils.escapeFieldNames(allIndexColumns) :+ IndexConstants.DATA_FILE_NAME_ID: _*)
} else {
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
.toDF(SchemaUtils.escapeFieldNames(columnsFromIndexConfig): _*)
}

(indexDF, resolvedIndexedColumns, resolvedIncludedColumns)
val escapedIndexedColumns = SchemaUtils.escapeFieldNames(resolvedIndexedColumns)
val escapedIncludedColumns = SchemaUtils.escapeFieldNames(resolvedIncludedColumns)

(indexDF, escapedIndexedColumns, escapedIncludedColumns)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ object IndexConstants {
// To provide multiple paths in the globbing pattern, separate them with commas, e.g.
// "/temp/1/*, /temp/2/*"
val GLOBBING_PATTERN_KEY = "spark.hyperspace.source.globbingPattern"

// Indicate whether the index has been built over a nested field.
private[hyperspace] val USES_NESTED_FIELDS_PROPERTY = "hasNestedFields"
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ case class IndexLogEntry(
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}

def usesNestedFields: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.USES_NESTED_FIELDS_PROPERTY, "false").toBoolean
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package com.microsoft.hyperspace.index.rules

import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GetStructField}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.index.rankers.FilterIndexRanker
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, ResolverUtils, SchemaUtils}

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -113,8 +116,8 @@ object FilterIndexRule

val candidateIndexes = allIndexes.filter { index =>
indexCoversPlan(
outputColumns,
filterColumns,
SchemaUtils.escapeFieldNames(outputColumns),
SchemaUtils.escapeFieldNames(filterColumns),
index.indexedColumns,
index.includedColumns)
}
Expand Down Expand Up @@ -168,9 +171,17 @@ object ExtractFilterNode {
val projectColumnNames = CleanupAliases(project)
.asInstanceOf[Project]
.projectList
.map(_.references.map(_.asInstanceOf[AttributeReference].name))
.map(extractNamesFromExpression)
.flatMap(_.toSeq)
val filterColumnNames = condition.references.map(_.name).toSeq
val filterColumnNames = extractNamesFromExpression(condition).toSeq
.sortBy(-_.length)
.foldLeft(Seq.empty[String]) { (acc, e) =>
if (!acc.exists(i => i.startsWith(e))) {
acc :+ e
} else {
acc
}
}

Some(project, filter, projectColumnNames, filterColumnNames)

Expand All @@ -183,6 +194,96 @@ object ExtractFilterNode {

case _ => None // plan does not match with any of filter index rule patterns
}

def extractNamesFromExpression(exp: Expression): Set[String] = {
exp match {
case AttributeReference(name, _, _, _) =>
Set(s"$name")
case otherExp =>
otherExp.containsChild.map {
case g: GetStructField =>
s"${getChildNameFromStruct(g)}"
case e: Expression =>
extractNamesFromExpression(e).filter(_.nonEmpty).mkString(".")
case _ => ""
}
}
}

def getChildNameFromStruct(field: GetStructField): String = {
field.child match {
case f: GetStructField =>
s"${getChildNameFromStruct(f)}.${field.name.get}"
case a: AttributeReference =>
s"${a.name}.${field.name.get}"
case _ =>
s"${field.name.get}"
}
}

def extractSearchQuery(exp: Expression, name: String): (Expression, Expression) = {
val splits = name.split(SchemaUtils.NESTED_FIELD_NEEDLE_REGEX)
val expFound = exp.find {
case a: AttributeReference if splits.forall(s => a.name.contains(s)) => true
case f: GetStructField if splits.forall(s => f.toString().contains(s)) => true
case _ => false
}.get
val parent = exp.find {
case e: Expression if e.containsChild.contains(expFound) => true
case _ => false
}.get
(parent, expFound)
}

def replaceInSearchQuery(
parent: Expression,
needle: Expression,
repl: Expression): Expression = {
parent.mapChildren { c =>
if (c == needle) {
repl
} else {
c
}
}
}

def extractAttributeRef(exp: Expression, name: String): AttributeReference = {
val splits = name.split(SchemaUtils.NESTED_FIELD_NEEDLE_REGEX)
val elem = exp.find {
case a: AttributeReference if splits.contains(a.name) => true
case _ => false
}
elem.get.asInstanceOf[AttributeReference]
}

def extractTypeFromExpression(exp: Expression, name: String): DataType = {
val splits = name.split(SchemaUtils.NESTED_FIELD_NEEDLE_REGEX)
val elem = exp.flatMap {
case a: AttributeReference =>
if (splits.forall(s => a.name == s)) {
Some((name, a.dataType))
} else {
Try({
val h :: t = splits.toList
if (a.name == h && a.dataType.isInstanceOf[StructType]) {
val currentDataType = a.dataType.asInstanceOf[StructType]
val foldedFields = t.foldLeft(Seq.empty[(String, DataType)]) { (acc, i) =>
val idx = currentDataType.indexWhere(_.name.equalsIgnoreCase(i))
acc :+ (i, currentDataType(idx).dataType)
}
Some(foldedFields.last)
} else {
None
}
}).getOrElse(None)
}
case f: GetStructField if splits.forall(s => f.toString().contains(s)) =>
Some((name, f.dataType))
case _ => None
}
elem.find(e => e._1 == name || e._1 == splits.last).get._2
}
}

object ExtractRelation extends ActiveSparkSession {
Expand Down
Loading

0 comments on commit d4cd2e8

Please sign in to comment.