diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 65c99ff1e..8f77e7aa0 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -17,17 +17,24 @@ jobs: versionSpec: '8' jdkArchitectureOption: 'x64' jdkSourceOption: 'PreInstalled' - - script: sbt ++2.11.12 clean + # Use sbt 1.4.9. The default sbt launcher in ubuntu-18.04 20210405 image is + # 1.5.0, but the version has an issue to compile with 0.13.18. + # See: https://github.com/sbt/sbt/issues/6447 + - script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz" + displayName: 'Download sbt 1.4.9' + - script: tar zxf /tmp/sbt.tgz -C /tmp/ + displayName: 'Extract sbt' + - script: /tmp/sbt//bin/sbt ++2.11.12 clean displayName: 'Running $sbt clean' - - script: sbt ++2.11.12 update + - script: /tmp/sbt/bin/sbt ++2.11.12 update displayName: 'Running $sbt update' - - script: sbt ++2.11.12 compile + - script: /tmp/sbt/bin/sbt ++2.11.12 compile displayName: 'Running $sbt compile' - - script: sbt ++2.11.12 test + - script: /tmp/sbt/bin/sbt ++2.11.12 test displayName: 'Running $sbt test' # If not a pull request, publish artifacts. - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - - script: sbt ++2.11.12 package + - script: /tmp/sbt/bin/sbt ++2.11.12 package displayName: 'Running $sbt package' - task: CopyFiles@2 displayName: 'Copy hyperspace-core JAR' @@ -46,17 +53,21 @@ jobs: pool: vmImage: 'ubuntu-18.04' steps: - - script: sbt ++2.12.8 clean + - script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz" + displayName: 'Download sbt 1.4.9' + - script: tar zxf /tmp/sbt.tgz -C /tmp/ + displayName: 'Extract sbt' + - script: /tmp/sbt/bin/sbt ++2.12.8 clean displayName: 'Running $sbt clean' - - script: sbt ++2.12.8 update + - script: /tmp/sbt/bin/sbt ++2.12.8 update displayName: 'Running $sbt update' - - script: sbt ++2.12.8 compile + - script: /tmp/sbt/bin/sbt ++2.12.8 compile displayName: 'Running $sbt compile' - - script: sbt ++2.12.8 test + - script: /tmp/sbt/bin/sbt ++2.12.8 test displayName: 'Running $sbt test' # If not a pull request, publish artifacts. - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - - script: sbt ++2.12.8 package + - script: /tmp/sbt/bin/sbt ++2.12.8 package displayName: 'Running $sbt package' - task: CopyFiles@2 displayName: 'Copy hyperspace-core JAR' @@ -86,11 +97,15 @@ jobs: versionSpec: '8' jdkArchitectureOption: 'x64' jdkSourceOption: 'PreInstalled' - - script: sbt ++2.11.12 clean + - script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz" + displayName: 'Download sbt 1.4.9' + - script: tar zxf /tmp/sbt.tgz -C /tmp/ + displayName: 'Extract sbt' + - script: /tmp/sbt/bin/sbt ++2.11.12 clean displayName: 'Running $sbt clean' - - script: sbt ++2.11.12 update + - script: /tmp/sbt/bin/sbt ++2.11.12 update displayName: 'Running $sbt update' - - script: sbt ++2.11.12 compile + - script: /tmp/sbt/bin/sbt ++2.11.12 compile displayName: 'Running $sbt compile' - task: Bash@3 inputs: diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 26802055e..bbe6f4e45 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -92,7 +92,7 @@ This file is divided into 3 sections: */ \E)?\Q/* - * Copyright (2020) The Hyperspace Project Authors. + * Copyright (\E(?:2020|2021)\Q) The Hyperspace Project Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index e74fdd670..ff4affa25 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -25,7 +25,8 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.index.sources.FileBasedRelation -import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils} +import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn /** * CreateActionBase provides functionality to write dataframe as covering index. @@ -83,17 +84,19 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) Hyperspace .getContext(spark) .sourceProviderManager + .getRelationMetadata(sourcePlanProperties.relations.head) .enrichIndexProperties( - sourcePlanProperties.relations.head, prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> versionId.toString) ++ hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation)) - IndexLogEntry( + IndexLogEntry.create( indexConfig.indexName, CoveringIndex( CoveringIndex.Properties( CoveringIndex.Properties - .Columns(resolvedIndexedColumns, resolvedIncludedColumns), + .Columns( + resolvedIndexedColumns.map(_.normalizedName), + resolvedIncludedColumns.map(_.normalizedName)), IndexLogEntry.schemaString(indexDataFrame.schema), numBuckets, coveringIndexProperties)), @@ -113,10 +116,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // Run job val repartitionedIndexDataFrame = { - // For nested fields, resolvedIndexedColumns will have flattened names with `.` (dots), - // thus they need to be enclosed in backticks to access them as top-level columns. - // Note that backticking the non-nested columns is a no-op. - indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"`$c`")): _*) + // We are repartitioning with normalized columns (e.g., flattened nested column). + indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(_.toNormalizedColumn): _*) } // Save the index with the number of buckets specified. @@ -125,7 +126,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) repartitionedIndexDataFrame, indexDataPath.toString, numBuckets, - resolvedIndexedColumns, + resolvedIndexedColumns.map(_.normalizedName), SaveMode.Overwrite) } @@ -159,7 +160,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) private def resolveConfig( df: DataFrame, - indexConfig: IndexConfig): (Seq[(String, Boolean)], Seq[(String, Boolean)]) = { + indexConfig: IndexConfig): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = { val spark = df.sparkSession val plan = df.queryExecution.analyzed val indexedColumns = indexConfig.indexedColumns @@ -171,7 +172,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) case (Some(indexed), Some(included)) => (indexed, included) case _ => val unresolvedColumns = (indexedColumns ++ includedColumns) - .map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_._1)))) + .map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name)))) .collect { case (c, r) if r.isEmpty => c } throw HyperspaceException( s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " + @@ -182,14 +183,9 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) private def prepareIndexDataFrame( spark: SparkSession, df: DataFrame, - indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = { + indexConfig: IndexConfig): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = { val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig) - val columnsFromIndexConfig = - resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1) - - val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns) - val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns) - val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns + val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn) val indexDF = if (hasLineage(spark)) { val relation = getRelation(spark, df) @@ -198,10 +194,12 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // 1. DATA_FILE_ID_COLUMN column contains source data file id for each index record. // 2. If source data is partitioned, all partitioning key(s) are added to index schema // as columns if they are not already part of the schema. - val partitionColumns = relation.partitionSchema.map(_.name) + val partitionColumnNames = relation.partitionSchema.map(_.name) + val resolvedColumnNames = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.name) val missingPartitionColumns = - partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty) - val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns + partitionColumnNames + .filter(ResolverUtils.resolve(spark, _, resolvedColumnNames).isEmpty) + .map(col(_)) // File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a // Long data type value. Each source data file has a unique file id, assigned by @@ -218,25 +216,15 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) val dataPathColumn = "_data_path" val lineagePairs = relation.lineagePairs(fileIdTracker) val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) - val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns df.withColumn(dataPathColumn, input_file_name()) .join(lineageDF.hint("broadcast"), dataPathColumn) - .select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+ - col(IndexConstants.DATA_FILE_NAME_ID): _*) + .select( + projectColumns ++ missingPartitionColumns :+ col(IndexConstants.DATA_FILE_NAME_ID): _*) } else { - df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*) + df.select(projectColumns: _*) } - (indexDF, prefixedIndexedColumns, prefixedIncludedColumns) - } - - private def prepareColumns( - originalColumns: Seq[String], - prefixedColumns: Seq[String]): Seq[Column] = { - originalColumns.zip(prefixedColumns).map { - case (original, prefixed) => - col(original).as(prefixed) - } + (indexDF, resolvedIndexedColumns, resolvedIncludedColumns) } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index cab3432a2..acf513644 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -16,7 +16,6 @@ package com.microsoft.hyperspace.actions -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.execution.datasources.BucketingUtils @@ -148,8 +147,8 @@ class OptimizeAction( properties = Hyperspace .getContext(spark) .sourceProviderManager + .getRelationMetadata(previousIndexLogEntry.relations.head) .enrichIndexProperties( - previousIndexLogEntry.relations.head, prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> endId.toString)))) if (filesToIgnore.nonEmpty) { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index ffabf1cd1..e2dfd04a4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.util.SchemaUtils +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn /** * Base abstract class containing common code for different types of index refresh actions. @@ -71,8 +71,11 @@ private[actions] abstract class RefreshActionBase( // Reconstruct a df from schema protected lazy val df = { val relations = previousIndexLogEntry.relations - val latestRelation = - Hyperspace.getContext(spark).sourceProviderManager.refreshRelationMetadata(relations.head) + val latestRelation = Hyperspace + .getContext(spark) + .sourceProviderManager + .getRelationMetadata(relations.head) + .refresh() val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] val df = spark.read .schema(dataSchema) @@ -95,8 +98,8 @@ private[actions] abstract class RefreshActionBase( previousIndexLogEntry.name, // As indexed & included columns in previousLogEntry are resolved & prefixed names, // need to remove the prefix to resolve with the dataframe for refresh. - SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed).map(_._1), - SchemaUtils.removePrefixNestedFieldNames(ddColumns.included).map(_._1)) + ddColumns.indexed.map(ResolvedColumn(_).name), + ddColumns.included.map(ResolvedColumn(_).name)) } final override val transientState: String = REFRESHING diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala index db05154bf..2b3c48b55 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshIncrementalAction.scala @@ -60,7 +60,8 @@ class RefreshIncrementalAction( val internalFileFormatName = Hyperspace .getContext(spark) .sourceProviderManager - .internalFileFormatName(previousIndexLogEntry.relations.head) + .getRelationMetadata(previousIndexLogEntry.relations.head) + .internalFileFormatName() // Create a df with only appended files from original list of files. val dfWithAppendedFiles = spark.read diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 7814e3a28..c896e1317 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -138,6 +138,18 @@ class IndexCollectionManager( } } + override def getIndex(indexName: String, logVersion: Int): Option[IndexLogEntry] = { + withLogManager(indexName) { logManager => + logManager.getLog(logVersion).map(_.asInstanceOf[IndexLogEntry]) + } + } + + override def getIndexVersions(indexName: String, states: Seq[String]): Seq[Int] = { + withLogManager(indexName) { logManager => + logManager.getIndexVersions(states) + } + } + private def indexLogManagers: Seq[IndexLogManager] = { val hadoopConf = spark.sessionState.newHadoopConf() val rootPath = PathResolver(conf, hadoopConf).systemPath @@ -162,12 +174,6 @@ class IndexCollectionManager( } } - override def getIndex(indexName: String, logVersion: Int): Option[IndexLogEntry] = { - withLogManager(indexName) { logManager => - logManager.getLog(logVersion).map(_.asInstanceOf[IndexLogEntry]) - } - } - private def withLogManager[T](indexName: String)(f: IndexLogManager => T): T = { getLogManager(indexName) match { case Some(logManager) => f(logManager) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index ac7e24a95..5e32455d6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -102,6 +102,8 @@ object IndexConstants { private[hyperspace] val LINEAGE_PROPERTY = "lineage" // Indicate whether the source file format is parquet. private[hyperspace] val HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY = "hasParquetAsSourceFormat" + // Indicate Hyperspace version. + private[hyperspace] val HYPERSPACE_VERSION_PROPERTY: String = "hyperspaceVersion" // Indicate index log version. private[hyperspace] val INDEX_LOG_VERSION = "indexLogVersion" diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index bc6cbc39f..4d9227dc9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{BuildInfo, HyperspaceException} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} +import com.microsoft.hyperspace.util.PathUtils // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. case class NoOpFingerprint() { @@ -430,7 +430,12 @@ object SparkPlan { // IndexLogEntry-specific Source that uses SparkPlan as a plan. case class Source(plan: SparkPlan) -// IndexLogEntry that captures index-related information. +/** + * IndexLogEntry that captures index-related information. + * Don't use this method to create a new IndexLogEntry, unless you specify all hyperspace project + * default properties. + * Refer the create method of IndexLogEntry Object for further details. + */ case class IndexLogEntry( name: String, derivedDataset: CoveringIndex, @@ -518,6 +523,7 @@ case class IndexLogEntry( numBuckets.equals(that.numBuckets) && content.root.equals(that.content.root) && source.equals(that.source) && + properties.equals(that.properties) && state.equals(that.state) case _ => false } @@ -610,6 +616,30 @@ object IndexLogEntry { val VERSION: String = "0.1" def schemaString(schema: StructType): String = schema.json + + /** + * Use this method to create a new IndexLogEntry, which automatically includes + * all common default hyperspace project properties. + * TODO: force dev to use this method as this takes into account all + * project properties that needed to be added by default. Currently, dev can also + * create IndexLogEntry from case class. + * https://github.com/microsoft/hyperspace/issues/370 + * Also add require for hyperspace project version when we introduce breaking change. + */ + def create( + name: String, + derivedDataset: CoveringIndex, + content: Content, + source: Source, + properties: Map[String, String]): IndexLogEntry = { + IndexLogEntry( + name, + derivedDataset, + content, + source, + properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version)) + ) + } } /** @@ -629,7 +659,7 @@ class FileIdTracker { def getMaxFileId: Long = maxId - def getFileToIdMap: HashMap[key, Long] = fileToIdMap + def getFileToIdMapping: Seq[(key, Long)] = fileToIdMap.toSeq def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = fileToIdMap.get((path, size, modifiedTime)) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogManager.scala index 453e4d6ee..022ab0a7f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogManager.scala @@ -35,12 +35,14 @@ trait IndexLogManager { def getLatestId(): Option[Int] - // TODO: This should be marked as final - remove test dependency. - def getLatestLog(): Option[LogEntry] = getLatestId().flatMap(getLog) + final def getLatestLog(): Option[LogEntry] = getLatestId().flatMap(getLog) /** Returns the latest LogEntry whose state is STABLE */ def getLatestStableLog(): Option[LogEntry] + /** Returns index log versions whose state is in the given states */ + def getIndexVersions(states: Seq[String]): Seq[Int] + /** update latest.json symlink to the given id/path */ def createLatestStableLog(id: Int): Boolean @@ -109,6 +111,21 @@ class IndexLogManagerImpl(indexPath: Path, hadoopConfiguration: Configuration = } } + override def getIndexVersions(states: Seq[String]): Seq[Int] = { + val latestId = getLatestId() + if (latestId.isDefined) { + (latestId.get to 0 by -1).map { id => + getLog(id) match { + case Some(entry) if states.contains(entry.state) => + Some(id) + case _ => None + } + }.flatten + } else { + Seq() + } + } + override def createLatestStableLog(id: Int): Boolean = { getLog(id) match { case Some(logEntry) if Constants.STABLE_STATES.contains(logEntry.state) => diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala index f8264f12c..3fc6dceb4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala @@ -113,4 +113,13 @@ trait IndexManager { * @return IndexLogEntry if the index of the given log version exists, otherwise None. */ def getIndex(indexName: String, logVersion: Int): Option[IndexLogEntry] + + /** + * Get index log version ids of the given index that match any of the given states. + * + * @param indexName Name of the index. + * @param states List of index states of interest. + * @return Index log versions. + */ + def getIndexVersions(indexName: String, states: Seq[String]): Seq[Int] } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index c501c0557..ce7dc3e7f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -305,8 +305,8 @@ object JoinIndexRule val rRequiredIndexedCols = lRMap.values.toSeq // All required columns resolved with base relation. - val lRequiredAllCols = resolve(spark, allRequiredCols(left), leftRelation.plan).get.map(_._1) - val rRequiredAllCols = resolve(spark, allRequiredCols(right), rightRelation.plan).get.map(_._1) + val lRequiredAllCols = resolve(spark, allRequiredCols(left), lBaseAttrs).get + val rRequiredAllCols = resolve(spark, allRequiredCols(right), rBaseAttrs).get // Make sure required indexed columns are subset of all required columns for a subplan require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 087f6559b..98f5625f3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -58,9 +58,7 @@ object RuleUtils { def signatureValid(entry: IndexLogEntry): Boolean = { entry.withCachedTag(relation.plan, IndexLogEntryTags.SIGNATURE_MATCHED) { - val sourcePlanSignatures = entry.source.plan.properties.fingerprint.properties.signatures - assert(sourcePlanSignatures.length == 1) - val sourcePlanSignature = sourcePlanSignatures.head + val sourcePlanSignature = entry.signature signatureMap.getOrElseUpdate( sourcePlanSignature.provider, diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 5b1763f25..49dbf0a3e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -43,28 +43,6 @@ class FileBasedSourceProviderManager(spark: SparkSession) { buildProviders(builderClassNames) }) - /** - * Runs refreshRelationMetadata() for each provider. - * - * @param relation [[Relation]] to refresh. - * @return Refreshed [[Relation]]. - * @throws [[HyperspaceException]] if multiple providers returns [[Some]] or - * if no providers return [[Some]]. - */ - def refreshRelationMetadata(relation: Relation): Relation = { - run(p => p.refreshRelationMetadata(relation)) - } - - /** - * Runs internalFileFormatName() for each provider. - * - * @param relation [[Relation]] object to read internal data files. - * @return File format to read internal data files. - */ - def internalFileFormatName(relation: Relation): String = { - run(p => p.internalFileFormatName(relation)) - } - /** * Returns true if the given logical plan is a supported relation. If all of the registered * providers return None, this returns false. (e.g, the given plan could be RDD-based relation, @@ -91,16 +69,27 @@ class FileBasedSourceProviderManager(spark: SparkSession) { } /** - * Returns enriched index properties. + * Returns true if the given relation metadata is a supported relation metadata. If all of the + * registered providers return None, this returns false. + * + * @param metadata Relation metadata to check if it's supported. + * @return True if the given plan is supported relation metadata. + */ + def isSupportedRelationMetadata(metadata: Relation): Boolean = { + runWithDefault(p => p.isSupportedRelationMetadata(metadata))(false) + } + + /** + * Returns the [[FileBasedRelationMetadata]] that wraps the given relation metadata. + * If you are using this from an extractor, check if the relation metadata + * is supported first by using [[isSupportedRelationMetadata]]. Otherwise, HyperspaceException + * can be thrown if none of the registered providers supports the given relation metadata. * - * @param relation Relation to retrieve necessary information. - * @param properties Index properties to enrich. - * @return New property entries for index creation or refresh. + * @param metadata Relation metadata to wrap to [[FileBasedRelationMetadata]] + * @return [[FileBasedRelationMetadata]] that wraps the given relation metadata. */ - def enrichIndexProperties( - relation: Relation, - properties: Map[String, String]): Map[String, String] = { - run(p => p.enrichIndexProperties(relation, properties)) + def getRelationMetadata(metadata: Relation): FileBasedRelationMetadata = { + run(p => p.getRelationMetadata(metadata)) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala index 9b845cbdf..a733f1106 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala @@ -233,7 +233,7 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe * @return List of pairs of (file path, file id). */ override def lineagePairs(fileIdTracker: FileIdTracker): Seq[(String, Long)] = { - fileIdTracker.getFileToIdMap.toSeq.map { kv => + fileIdTracker.getFileToIdMapping.map { kv => (kv._1._1.replace("file:/", "file:///"), kv._2) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelationMetadata.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelationMetadata.scala new file mode 100644 index 000000000..1128cf1a8 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelationMetadata.scala @@ -0,0 +1,39 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.default + +import com.microsoft.hyperspace.index.Relation +import com.microsoft.hyperspace.index.sources.FileBasedRelationMetadata + +/** + * Default file-based relation metadata implementation for file-based Spark built-in sources + */ +class DefaultFileBasedRelationMetadata(metadata: Relation) extends FileBasedRelationMetadata { + + override def refresh(): Relation = { + // No change is needed because rootPaths will be pointing to the latest source files. + metadata + } + + override def internalFileFormatName(): String = { + metadata.fileFormat + } + + override def enrichIndexProperties(properties: Map[String, String]): Map[String, String] = { + properties + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 40dae1a0b..7f507eb09 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -18,13 +18,13 @@ package com.microsoft.hyperspace.index.sources.default import java.util.Locale -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.index.Relation -import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedRelationMetadata, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} /** @@ -65,37 +65,6 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS supportedFormats.load().contains(name.toLowerCase(Locale.ROOT)) } - /** - * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. - * - * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[Relation]] object if the given 'relation' can be processed by this provider. - * Otherwise, None. - */ - override def refreshRelationMetadata(relation: Relation): Option[Relation] = { - if (isSupportedFileFormatName(relation.fileFormat)) { - // No change is needed because rootPaths will be pointing to the latest source files. - Some(relation) - } else { - None - } - } - - /** - * Returns a file format name to read internal data files for a given [[Relation]]. - * - * @param relation [[Relation]] object to read internal data files. - * @return File format to read internal data files. - */ - override def internalFileFormatName(relation: Relation): Option[String] = { - if (isSupportedFileFormatName(relation.fileFormat)) { - // Same as original file format. - Some(relation.fileFormat) - } else { - None - } - } - /** * Returns true if the given logical plan is a supported relation. * @@ -119,28 +88,25 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * @param plan Logical plan to wrap to [[FileBasedRelation]] * @return [[FileBasedRelation]] that wraps the given logical plan. */ - def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = plan match { - case l @ LogicalRelation( - HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, fileFormat, _), - _, - _, - _) if isSupportedFileFormat(fileFormat) => - Some(new DefaultFileBasedRelation(spark, l)) - case _ => None + def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = { + if (isSupportedRelation(plan).contains(true)) { + Some(new DefaultFileBasedRelation(spark, plan.asInstanceOf[LogicalRelation])) + } else { + None + } } - /** - * Returns enriched index properties. - * - * @param relation Relation to retrieve necessary information. - * @param properties Index properties to enrich. - * @return Updated index properties for index creation or refresh. - */ - override def enrichIndexProperties( - relation: Relation, - properties: Map[String, String]): Option[Map[String, String]] = { - if (isSupportedFileFormatName(relation.fileFormat)) { - Some(properties) + override def isSupportedRelationMetadata(metadata: Relation): Option[Boolean] = { + if (isSupportedFileFormatName(metadata.fileFormat)) { + Some(true) + } else { + None + } + } + + override def getRelationMetadata(metadata: Relation): Option[FileBasedRelationMetadata] = { + if (isSupportedRelationMetadata(metadata).contains(true)) { + Some(new DefaultFileBasedRelationMetadata(metadata)) } else { None } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index cbf6b5203..4935a5af8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -16,13 +16,13 @@ package com.microsoft.hyperspace.index.sources.delta -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import com.microsoft.hyperspace.index.{IndexConstants, Relation} -import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedRelationMetadata, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} object DeltaLakeConstants { val DELTA_FORMAT_STR = "delta" @@ -39,35 +39,6 @@ object DeltaLakeConstants { */ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { - /** - * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. - * - * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[Relation]] object if the given 'relation' can be processed by this provider. - * Otherwise, None. - */ - override def refreshRelationMetadata(relation: Relation): Option[Relation] = { - if (relation.fileFormat.equals(DeltaLakeConstants.DELTA_FORMAT_STR)) { - Some(relation.copy(options = relation.options - "versionAsOf" - "timestampAsOf")) - } else { - None - } - } - - /** - * Returns a file format name to read internal data files for a given [[Relation]]. - * - * @param relation [[Relation]] object to read internal data files. - * @return File format to read internal data files. - */ - override def internalFileFormatName(relation: Relation): Option[String] = { - if (relation.fileFormat.equals(DeltaLakeConstants.DELTA_FORMAT_STR)) { - Some("parquet") - } else { - None - } - } - /** * Returns true if the given logical plan is a relation for Delta Lake. * @@ -87,38 +58,27 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase * @param plan Logical plan to wrap to [[FileBasedRelation]] * @return [[FileBasedRelation]] that wraps the given logical plan. */ - def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = plan match { - case l @ LogicalRelation(HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _), _, _, _) => - Some(new DeltaLakeRelation(spark, l)) - case _ => None + def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = { + if (isSupportedRelation(plan).contains(true)) { + Some(new DeltaLakeRelation(spark, plan.asInstanceOf[LogicalRelation])) + } else { + None + } } - /** - * Returns enriched index properties. - * - * Delta Lake source provider adds: - * 1) DELTA_VERSION_HISTORY_PROPERTY logs the history of INDEX_VERSION:DELTA_TABLE_VERSION - * values for each index creation & refresh. - * - * @param relation Relation to retrieve necessary information. - * @param properties Index properties to enrich. - * @return Update index properties for index creation or refresh. - */ - override def enrichIndexProperties( - relation: Relation, - properties: Map[String, String]): Option[Map[String, String]] = { - if (!relation.fileFormat.equals(DeltaLakeConstants.DELTA_FORMAT_STR)) { + override def isSupportedRelationMetadata(metadata: Relation): Option[Boolean] = { + if (metadata.fileFormat.equals(DeltaLakeConstants.DELTA_FORMAT_STR)) { + Some(true) + } else { None + } + } + + override def getRelationMetadata(metadata: Relation): Option[FileBasedRelationMetadata] = { + if (isSupportedRelationMetadata(metadata).contains(true)) { + Some(new DeltaLakeRelationMetadata(metadata)) } else { - val indexVersion = properties(IndexConstants.INDEX_LOG_VERSION) - val deltaVerHistory = relation.options.get("versionAsOf").map { deltaVersion => - val newVersionMapping = s"$indexVersion:$deltaVersion" - DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY -> - properties.get(DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY).map { prop => - s"$prop,$newVersionMapping" - }.getOrElse(newVersionMapping) - } - Some(properties ++ deltaVerHistory) + None } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala index 5a8d591c3..0730979af 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala @@ -23,9 +23,10 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import com.microsoft.hyperspace.Hyperspace -import com.microsoft.hyperspace.index.{Content, FileIdTracker, FileInfo, Hdfs, IndexLogEntry, Relation} +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexLogEntry, Relation} import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedRelation -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils} /** * Implementation for file-based relation used by [[DeltaLakeFileBasedSource]] @@ -131,7 +132,7 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) * @return List of pairs of (file path, file id). */ override def lineagePairs(fileIdTracker: FileIdTracker): Seq[(String, Long)] = { - fileIdTracker.getFileToIdMap.toSeq.map { kv => + fileIdTracker.getFileToIdMapping.map { kv => (kv._1._1, kv._2) } } @@ -184,8 +185,17 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) * @return IndexLogEntry of the closest version among available index versions. */ override def closestIndex(index: IndexLogEntry): IndexLogEntry = { + // Only support when Hybrid Scan is enabled for both appended and deleted files. + // TODO: Support time travel utilizing Hybrid Scan append-only. + // See https://github.com/microsoft/hyperspace/issues/408. + if (!(HyperspaceConf.hybridScanEnabled(spark) && + HyperspaceConf.hybridScanDeleteEnabled(spark) && + index.hasLineageColumn)) { + return index + } + // Seq of (index log version, delta lake table version) - val versions = deltaLakeVersionHistory(index) + val versionsHistory = deltaLakeVersionHistory(index) lazy val indexManager = Hyperspace.getContext(spark).indexCollectionManager def getIndexLogEntry(logVersion: Int): IndexLogEntry = { @@ -194,9 +204,12 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) .get .asInstanceOf[IndexLogEntry] } - // TODO: Currently assume all versions of index data exist. - // Need to check and remove candidate indexes. - // See https://github.com/microsoft/hyperspace/issues/387 + + val activeLogVersions = + indexManager.getIndexVersions(index.name, Seq(Constants.States.ACTIVE)) + val versions = versionsHistory.filter { + case (indexLogVersion, _) => activeLogVersions.contains(indexLogVersion) + } plan.relation match { case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => @@ -206,7 +219,7 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) if (equalOrLessThanLastIndex == versions.size - 1) { // The given table version is equal or larger than the latest index's. // Use the latest version. - index + getIndexLogEntry(versions.last._1) } else if (equalOrLessThanLastIndex == -1) { // The given table version is smaller than the version at index creation. // Use the initial version. diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala new file mode 100644 index 000000000..5464d187a --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala @@ -0,0 +1,56 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.delta + +import com.microsoft.hyperspace.index.{IndexConstants, Relation} +import com.microsoft.hyperspace.index.sources.FileBasedRelationMetadata + +/** + * Implementation for file-based relation metadata used by [[DeltaLakeFileBasedSource]] + */ +class DeltaLakeRelationMetadata(metadata: Relation) extends FileBasedRelationMetadata { + + override def refresh(): Relation = { + metadata.copy(options = metadata.options - "versionAsOf" - "timestampAsOf") + } + + override def internalFileFormatName(): String = { + "parquet" + } + + /** + * Returns enriched index properties. + * + * Delta Lake source provider adds: + * 1) DELTA_VERSION_HISTORY_PROPERTY logs the history of INDEX_VERSION:DELTA_TABLE_VERSION + * values for each index creation & refresh. + * + * @param properties Index properties to enrich. + * @return Updated index properties for index creation or refresh. + */ + override def enrichIndexProperties(properties: Map[String, String]): Map[String, String] = { + val indexVersion = properties(IndexConstants.INDEX_LOG_VERSION) + val deltaVerHistory = metadata.options.get("versionAsOf").map { deltaVersion => + val newVersionMapping = s"$indexVersion:$deltaVersion" + DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY -> + properties.get(DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY).map { prop => + s"$prop,$newVersionMapping" + }.getOrElse(newVersionMapping) + } + properties ++ deltaVerHistory + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergFileBasedSource.scala index 82f9a6330..79574c1c0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergFileBasedSource.scala @@ -17,12 +17,12 @@ package com.microsoft.hyperspace.index.sources.iceberg import org.apache.iceberg.spark.source.IcebergSource -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import com.microsoft.hyperspace.index.Relation -import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.index.sources.{FileBasedRelation, FileBasedRelationMetadata, FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} /** * Iceberg file-based source provider. @@ -35,35 +35,6 @@ class IcebergFileBasedSource(private val spark: SparkSession) extends FileBasedS private val ICEBERG_FORMAT_STR = "iceberg" - /** - * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. - * - * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[Relation]] object if the given 'relation' can be processed by this provider. - * Otherwise, None. - */ - override def refreshRelationMetadata(relation: Relation): Option[Relation] = { - if (relation.fileFormat.equals(ICEBERG_FORMAT_STR)) { - Some(relation.copy(options = relation.options - "snapshot-id" - "as-of-timestamp")) - } else { - None - } - } - - /** - * Returns a file format name to read internal data files for a given [[Relation]]. - * - * @param relation [[Relation]] object to read internal data files. - * @return File format to read internal data files. - */ - override def internalFileFormatName(relation: Relation): Option[String] = { - if (relation.fileFormat.equals(ICEBERG_FORMAT_STR)) { - Some("parquet") - } else { - None - } - } - /** * Returns true if the given logical plan is a relation for Iceberg. * @@ -83,24 +54,25 @@ class IcebergFileBasedSource(private val spark: SparkSession) extends FileBasedS * @param plan Logical plan to wrap to [[FileBasedRelation]] * @return [[FileBasedRelation]] that wraps the given logical plan. */ - override def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = plan match { - case l @ DataSourceV2Relation(_: IcebergSource, _, _, _, _) => - Some(new IcebergRelation(spark, l)) - case _ => None + override def getRelation(plan: LogicalPlan): Option[FileBasedRelation] = { + if (isSupportedRelation(plan).contains(true)) { + Some(new IcebergRelation(spark, plan.asInstanceOf[DataSourceV2Relation])) + } else { + None + } } - /** - * Returns enriched index properties. - * - * @param relation Relation to retrieve necessary information. - * @param properties Index properties to enrich. - * @return Updated index properties for index creation or refresh. - */ - override def enrichIndexProperties( - relation: Relation, - properties: Map[String, String]): Option[Map[String, String]] = { - if (relation.fileFormat.equals(ICEBERG_FORMAT_STR)) { - Some(properties) + override def isSupportedRelationMetadata(metadata: Relation): Option[Boolean] = { + if (metadata.fileFormat.equals(ICEBERG_FORMAT_STR)) { + Some(true) + } else { + None + } + } + + override def getRelationMetadata(metadata: Relation): Option[FileBasedRelationMetadata] = { + if (isSupportedRelationMetadata(metadata).contains(true)) { + Some(new IcebergRelationMetadata(metadata)) } else { None } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala index acec90b79..0357cc053 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala @@ -140,11 +140,11 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati // original file path: file:///path/to/file or file:/path/to/file // input_file_name(): /path/to/file if (Path.WINDOWS) { - fileIdTracker.getFileToIdMap.toSeq.map { kv => + fileIdTracker.getFileToIdMapping.map { kv => (kv._1._1.stripPrefix("file:/"), kv._2) } } else { - fileIdTracker.getFileToIdMap.toSeq.map { kv => + fileIdTracker.getFileToIdMapping.map { kv => (kv._1._1.replaceFirst("^file:/{1,3}", "/"), kv._2) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelationMetadata.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelationMetadata.scala new file mode 100644 index 000000000..17668bf88 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelationMetadata.scala @@ -0,0 +1,38 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.iceberg + +import com.microsoft.hyperspace.index.Relation +import com.microsoft.hyperspace.index.sources.FileBasedRelationMetadata + +/** + * Implementation for file-based relation metadata used by [[IcebergFileBasedSource]] + */ +class IcebergRelationMetadata(metadata: Relation) extends FileBasedRelationMetadata { + + override def refresh(): Relation = { + metadata.copy(options = metadata.options - "snapshot-id" - "as-of-timestamp") + } + + override def internalFileFormatName(): String = { + "parquet" + } + + override def enrichIndexProperties(properties: Map[String, String]): Map[String, String] = { + properties + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 9e857c5e7..336596826 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -183,27 +183,6 @@ trait SourceProviderBuilder { */ trait FileBasedSourceProvider extends SourceProvider { - /** - * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. - * - * This API is used when an index is refreshed. - * - * If the given relation does not belong to this provider, None should be returned. - * - * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[Relation]] object if the given 'relation' can be processed by this provider. - * Otherwise, None. - */ - def refreshRelationMetadata(relation: Relation): Option[Relation] - - /** - * Returns a file format name to read internal data for a given [[Relation]]. - * - * @param relation [[Relation]] object to read internal data files. - * @return File format to read internal data files. - */ - def internalFileFormatName(relation: Relation): Option[String] - /** * Returns true if the given logical plan is a supported relation. * @@ -222,13 +201,55 @@ trait FileBasedSourceProvider extends SourceProvider { def getRelation(plan: LogicalPlan): Option[FileBasedRelation] /** - * Returns enriched index properties. + * Returns true if the given relation metadata is supported relation metadata. + * + * @param metadata Relation metadata to check if it's supported. + * @return Some(true) if the given relation metadata is supported relation metadata, otherwise + * None. + */ + def isSupportedRelationMetadata(metadata: Relation): Option[Boolean] + + /** + * Returns the [[FileBasedRelationMetadata]] that wraps the given relation metadata + * if the given relation metadata is supported relation metadata + * + * @param metadata Relation metadata to wrap to [[FileBasedRelationMetadata]] + * @return [[FileBasedRelationMetadata]] that wraps the given relation metadata + */ + def getRelationMetadata(metadata: Relation): Option[FileBasedRelationMetadata] +} + +/** + * ::Experimental:: + * A trait that represents relation metadata for a source provider. + * + * @since 0.5.0 + */ +trait SourceRelationMetadata + +/** + * ::Experimental:: + * A trait that a source provider should implement to represent the source relation metadata + * that is based on files. + * + * @since 0.5.0 + */ +trait FileBasedRelationMetadata extends SourceRelationMetadata { + + /** + * Returns new [[Relation]] metadata that will have the latest source. + */ + def refresh(): Relation + + /** + * Returns file format name to read internal data. + */ + def internalFileFormatName(): String + + /** + * Returns updated index properties for index creation or refresh. * - * @param relation Logical relation to retrieve necessary information. * @param properties Index properties to enrich. - * @return Updated index properties for index creation or refresh. */ - def enrichIndexProperties( - relation: Relation, - properties: Map[String, String]): Option[Map[String, String]] + def enrichIndexProperties(properties: Map[String, String]): Map[String, String] } diff --git a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala index 0cf53b82a..b198711b7 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/ResolverUtils.scala @@ -16,19 +16,91 @@ package com.microsoft.hyperspace.util -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetArrayStructFields, GetMapValue, GetStructField} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn.NESTED_FIELD_PREFIX /** * [[ResolverUtils]] provides utility functions to resolve strings based on spark's resolver. */ object ResolverUtils { + /** + * [[ResolvedColumn]] stores information when a column name is resolved against the + * analyzed plan and its schema. + * + * Outside unit tests, this object should not be created directly, but via the `resolve` function, + * or `ResolvedColumn.apply` with a normalized name. + * + * @param name The column name resolved from an analyzed plan. + * @param isNested Flag to denote if this column is nested or not. + */ + private[hyperspace] case class ResolvedColumn(name: String, isNested: Boolean) { + assert(!isNested || (name.contains(".") && !name.startsWith(NESTED_FIELD_PREFIX))) + // Quotes will be removed from `resolve` and nested columns with quotes (e.g., "a.`b.c`.d") + // are not supported. + assert(!name.contains("`")) + + // For nested fields, the column name is prefixed with `NESTED_FIELD_PREFIX`. + lazy val normalizedName = { + if (isNested) { + s"$NESTED_FIELD_PREFIX$name" + } else { + name + } + } + + /** + * Create a column using the resolved name. Top level column names are quoted, and + * nested column names are aliased with normalized names. + * + * @return [[Column]] object created using the resolved name. + */ + def toColumn: Column = { + if (isNested) { + // No need to quote the string for "as" even if it contains dots. + col(name).as(normalizedName) + } else { + col(quote(name)) + } + } + + /** + * Create a column using the normalized name. Since the normalized name is already flattened + * with "dots", it is quoted. + * + * @return [[Column]] object create using the normalized name. + */ + def toNormalizedColumn: Column = col(quote(normalizedName)) + + private def quote(name: String) = s"`$name`" + } + + private[hyperspace] object ResolvedColumn { + private val NESTED_FIELD_PREFIX = "__hs_nested." + + /** + * Given a normalized column name, create [[ResolvedColumn]] after handling the prefix + * for nested columns. + * + * @param normalizedColumnName Normalized column name. + * @return [[ResolvedColumn]] created from the given normalized column name. + */ + def apply(normalizedColumnName: String): ResolvedColumn = { + if (normalizedColumnName.startsWith(NESTED_FIELD_PREFIX)) { + ResolvedColumn(normalizedColumnName.substring(NESTED_FIELD_PREFIX.length), isNested = true) + } else { + ResolvedColumn(normalizedColumnName, isNested = false) + } + } + } + /** * Return available string if required string can be resolved with it, based on spark resolver. * @@ -84,13 +156,13 @@ object ResolverUtils { * @param spark Spark session. * @param requiredStrings List of strings to resolve. * @param plan Logical plan to resolve against. - * @return Optional sequence of tuples of resolved name string and nested state boolean - * if all required strings are resolved. Else, None. + * @return Optional sequence of ResolvedColumn objects if all required strings are resolved. + * Else, None. */ def resolve( spark: SparkSession, requiredStrings: Seq[String], - plan: LogicalPlan): Option[Seq[(String, Boolean)]] = { + plan: LogicalPlan): Option[Seq[ResolvedColumn]] = { val schema = plan.schema val resolver = spark.sessionState.conf.resolver val resolved = requiredStrings.map { requiredField => @@ -99,12 +171,8 @@ object ResolverUtils { .map { expr => val resolvedColNameParts = extractColumnName(expr) validateResolvedColumnName(requiredField, resolvedColNameParts) - getColumnNameFromSchema(schema, resolvedColNameParts, resolver) - .foldLeft(("", false)) { (acc, i) => - val name = Seq(acc._1, i._1).filter(_.nonEmpty).mkString(".") - val isNested = acc._2 || i._2 - (name, isNested) - } + val origColNameParts = getColumnNameFromSchema(schema, resolvedColNameParts, resolver) + ResolvedColumn(origColNameParts.mkString("."), origColNameParts.length > 1) } .getOrElse { return None } } @@ -144,19 +212,19 @@ object ResolverUtils { private def getColumnNameFromSchema( schema: StructType, resolvedColNameParts: Seq[String], - resolver: Resolver): Seq[(String, Boolean)] = resolvedColNameParts match { + resolver: Resolver): Seq[String] = resolvedColNameParts match { case h :: tail => val field = schema.find(f => resolver(f.name, h)).get field match { case StructField(name, s: StructType, _, _) => - (name, true) +: getColumnNameFromSchema(s, tail, resolver) + name +: getColumnNameFromSchema(s, tail, resolver) case StructField(_, _: ArrayType, _, _) => // TODO: Nested arrays will be supported later throw HyperspaceException("Array types are not supported.") case StructField(_, _: MapType, _, _) => // TODO: Nested maps will be supported later throw HyperspaceException("Map types are not supported") - case f => Seq((f.name, false)) + case f => Seq(f.name) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala deleted file mode 100644 index 1c29062a9..000000000 --- a/src/main/scala/com/microsoft/hyperspace/util/SchemaUtils.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.util - -object SchemaUtils { - - val NESTED_FIELD_PREFIX = "__hs_nested." - val NESTED_FIELD_PREFIX_REGEX = "^__hs_nested\\." - - /** - * The method prefixes a nested field name that hasn't already been prefixed. - * The field name must be nested (it should contain a `.` and its type - * should be of [[org.apache.spark.sql.types.StructType]]). - * - * The inverse operation is [[removePrefixNestedFieldName]]. - * - * @param fieldName The nested field name to prefix. - * @return A new prefixed field name. - */ - def prefixNestedFieldName(fieldName: String): String = { - if (fieldName.contains(".") && !fieldName.startsWith(NESTED_FIELD_PREFIX)) { - s"$NESTED_FIELD_PREFIX$fieldName" - } else { - fieldName - } - } - - /** - * The method prefixes the nested field names from a map where the keys are - * the field names and the values are the nested state of that field - * which should be the result of [[ResolverUtils.resolve]]. - * The field names that are not marked as nested will not be changed. - * - * See [[prefixNestedFieldName]] and [[ResolverUtils.resolve]] methods. - * - * @param fieldNames A sequence of tuples of field names and nested status. - * @return A collection with prefixed nested fields. - */ - def prefixNestedFieldNames(fieldNames: Seq[(String, Boolean)]): Seq[String] = { - fieldNames.map { - case (fieldName, true) => - prefixNestedFieldName(fieldName) - case (fieldName, false) => - fieldName - } - } - - /** - * The method removes the prefix from a prefixed nested field name. It returns - * the original nested field name. - * - * The inverse operation is [[prefixNestedFieldName]]. - * - * @param fieldName The prefixed nested field name from which to remove the prefix. - * @return The original field name without prefix. - */ - def removePrefixNestedFieldName(fieldName: String): String = { - fieldName.replaceAll(NESTED_FIELD_PREFIX_REGEX, "") - } - - /** - * The method removes the prefix from a collection of prefixed nested field names. - * It returns the original sequence of tuples of field names and nested state. - * - * The inverse operation is [[prefixNestedFieldNames]]. - * - * @param fieldNames The collection of prefixed field names. - * @return A sequence of tuples of field names and nested status. - */ - def removePrefixNestedFieldNames(fieldNames: Seq[String]): Seq[(String, Boolean)] = { - fieldNames.map { fieldName => - if (SchemaUtils.isFieldNamePrefixed(fieldName)) { - removePrefixNestedFieldName(fieldName) -> true - } else { - fieldName -> false - } - } - } - - /** - * The method checks if the given field name is prefixed. - * - * @param fieldName The field name that to check for prefix. - * @return True if is prefixed otherwise false. - */ - def isFieldNamePrefixed(fieldName: String): Boolean = { - fieldName.startsWith(NESTED_FIELD_PREFIX) - } - - /** - * The method checks if the collection of field names contains a nested one - * by checking the prefix. - * - * @param fieldNames The collection of field names to check. - * @return True is at leas one field name is prefixed. - */ - def containsNestedFieldNames(fieldNames: Seq[String]): Boolean = { - fieldNames.exists(isFieldNamePrefixed) - } -} diff --git a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala index 8f4f573d5..7b3e6e54b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexNestedTest.scala @@ -16,7 +16,6 @@ package com.microsoft.hyperspace.index -import scala.collection.immutable.ListMap import scala.collection.mutable.WrappedArray import org.apache.hadoop.conf.Configuration @@ -26,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.functions._ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleNestedData} -import com.microsoft.hyperspace.util.{FileUtils, SchemaUtils} +import com.microsoft.hyperspace.util.FileUtils +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { override val systemPath = new Path("src/test/resources/indexLocation") @@ -160,25 +160,23 @@ class CreateIndexNestedTest extends HyperspaceSuite with SQLHelper { // should be added to index schema if they are not already among index config columns. assert( indexRecordsDF.schema.fieldNames.sorted === - (SchemaUtils.prefixNestedFieldNames( - indexConfig2.indexedColumns.zip(Seq(true)) ++ - indexConfig2.includedColumns.zip(Seq(true))) ++ + ((indexConfig2.indexedColumns ++ indexConfig2.includedColumns).map( + ResolvedColumn(_, isNested = true).normalizedName) ++ Seq(IndexConstants.DATA_FILE_NAME_ID) ++ partitionKeys).sorted) } } test("Check lineage in index records for non-partitioned data.") { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { - hyperspace.createIndex(nonPartitionedDataDF, indexConfig1) + hyperspace.createIndex(nonPartitionedDataDF, indexConfig2) val indexRecordsDF = spark.read.parquet( - s"$systemPath/${indexConfig1.indexName}/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") + s"$systemPath/${indexConfig2.indexName}/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") // For non-partitioned data, only file name lineage column should be added to index schema. assert( indexRecordsDF.schema.fieldNames.sorted === - (SchemaUtils.prefixNestedFieldNames( - indexConfig1.indexedColumns.zip(Seq(true)) ++ - indexConfig1.includedColumns.zip(Seq(false, true))) ++ + ((indexConfig2.indexedColumns ++ indexConfig2.includedColumns).map( + ResolvedColumn(_, isNested = true).normalizedName) ++ Seq(IndexConstants.DATA_FILE_NAME_ID)).sorted) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTest.scala index 9a5914d5e..640f6054d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/CreateIndexTest.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.plans.SQLHelper -import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} +import com.microsoft.hyperspace.{BuildInfo, Hyperspace, HyperspaceException, SampleData, TestUtils} import com.microsoft.hyperspace.util.FileUtils class CreateIndexTest extends HyperspaceSuite with SQLHelper { @@ -241,4 +241,13 @@ class CreateIndexTest extends HyperspaceSuite with SQLHelper { lineageValues.forall(lineageRange.contains(_)) } } + + test("Verify that hyperspace version is written to the log entry.") { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig1) + val logManager = TestUtils.logManager(systemPath, indexConfig1.indexName) + val logEntry = logManager.getLatestLog().get.asInstanceOf[IndexLogEntry] + val version = logEntry.properties.get(IndexConstants.HYPERSPACE_VERSION_PROPERTY) + assert(version.isDefined) + assert(version.get === BuildInfo.version) + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index 8feaa43b4..c3f5865ba 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -68,7 +68,11 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { dfFromSample.write.format("delta").save(dataPath) val deltaDf = spark.read.format("delta").load(dataPath) - hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + + // Enable lineage column for Hybrid Scan delete support to test time travel. + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + } withIndex("deltaIndex") { def query(version: Option[Long] = None): DataFrame = { @@ -102,7 +106,8 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { // The index should not be applied for the version at index creation. assert(!isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) - withSQLConf(TestConfig.HybridScanEnabledAppendOnly: _*) { + // Enable Hybrid Scan for time travel query validation. + withSQLConf(TestConfig.HybridScanEnabled: _*) { // The index should be applied for the updated version. assert(isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex/v__=0")) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/FileIdTrackerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/FileIdTrackerTest.scala new file mode 100644 index 000000000..ea602a8f0 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/FileIdTrackerTest.scala @@ -0,0 +1,100 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.SparkFunSuite + +import com.microsoft.hyperspace.HyperspaceException + +class FileIdTrackerTest extends SparkFunSuite { + test("getMaxFileId returns -1 on a new instance.") { + val tracker = new FileIdTracker + assert(tracker.getMaxFileId == -1) + } + + test("getFileToIdMapping is empty on a new instance.") { + val tracker = new FileIdTracker + assert(tracker.getFileToIdMapping.isEmpty) + } + + test("getFileId returns None on unseen file properties.") { + val tracker = new FileIdTracker + assert(tracker.getFileId(path = "abc", size = 123, modifiedTime = 555).isEmpty) + } + + test("addFileInfo does nothing with an empty files set.") { + val tracker = new FileIdTracker + tracker.addFileInfo(Set[FileInfo]()) + assert(tracker.getFileToIdMapping.isEmpty) + assert(tracker.getMaxFileId == -1) + } + + test("addFileInfo throws an exception if there is a FileInfo having an invalid file id.") { + val tracker = new FileIdTracker + val ex = intercept[HyperspaceException] { + tracker.addFileInfo(Set(FileInfo("abc", 123, 555, IndexConstants.UNKNOWN_FILE_ID))) + } + assert(ex.getMessage.contains("Cannot add file info with unknown id")) + } + + test( + "addFileInfo throws an exception if there is a conflict but modifications " + + "before the exception are retained.") { + val tracker = new FileIdTracker + tracker.addFileInfo(Set(FileInfo("def", 123, 555, 10))) + val ex = intercept[HyperspaceException] { + implicit def ordering: Ordering[FileInfo] = new Ordering[FileInfo] { + override def compare(x: FileInfo, y: FileInfo): Int = { + x.name.compareTo(y.name) + } + } + tracker.addFileInfo(scala.collection.immutable.SortedSet( + FileInfo("abc", 100, 555, 15), + FileInfo("def", 123, 555, 11))) + } + assert(ex.getMessage.contains("Adding file info with a conflicting id")) + assert(tracker.getFileId("abc", 100, 555).contains(15)) + } + + test("addFileInfo puts new records in the map and increase the max id on success.") { + val tracker = new FileIdTracker + tracker.addFileInfo(Set(FileInfo("abc", 123, 555, 10), FileInfo("def", 234, 777, 5))) + assert(tracker.getFileId("abc", 123, 555).contains(10)) + assert(tracker.getFileId("def", 234, 777).contains(5)) + assert(tracker.getMaxFileId == 10) + } + + test("addFile returns the existing id and max id is unchanged.") { + val tracker = new FileIdTracker + tracker.addFileInfo(Set(FileInfo("abc", 123, 555, 10))) + assert(tracker.getMaxFileId == 10) + assert(tracker.addFile(new FileStatus(123, false, 3, 1, 555, new Path("abc"))) == 10) + assert(tracker.getMaxFileId == 10) + } + + test("addFile returns a new id and max id is updated.") { + val tracker = new FileIdTracker + assert(tracker.addFile(new FileStatus(123, false, 3, 1, 555, new Path("abc"))) == 0) + assert(tracker.addFile(new FileStatus(123, false, 3, 1, 555, new Path("def"))) == 1) + assert(tracker.addFile(new FileStatus(124, false, 3, 1, 777, new Path("xyz"))) == 2) + assert(tracker.getMaxFileId == 2) + assert(tracker.getFileId("abc", 123, 555).contains(0)) + assert(tracker.getFileId("def", 123, 555).contains(1)) + assert(tracker.getFileId("xyz", 124, 777).contains(2)) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala index 3e2771882..f2ffc5db0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala @@ -107,7 +107,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite { indexName: String, expectedDeletedFileNames: Seq[String]): Unit = { - val fileNameToId = getLatestStableLog(indexName).fileIdTracker.getFileToIdMap.toSeq.map { + val fileNameToId = getLatestStableLog(indexName).fileIdTracker.getFileToIdMapping.toSeq.map { kv => (kv._1._1, kv._2) }.toMap diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala index 13dc15a88..a96540f86 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala @@ -37,7 +37,7 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite { override def createLatestStableLog(id: Int): Boolean = throw new NotImplementedError override def deleteLatestStableLog(): Boolean = throw new NotImplementedError override def writeLog(id: Int, log: LogEntry): Boolean = throw new NotImplementedError - override def getLatestLog(): Option[LogEntry] = Some(testLogEntry) + override def getIndexVersions(states: Seq[String]): Seq[Int] = Seq(0) private val testLogEntry: IndexLogEntry = { val sourcePlanProperties = SparkPlan.Properties( diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index ebe4a5eec..c4936d88f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.scalatest.BeforeAndAfter -import com.microsoft.hyperspace.{HyperspaceException, TestUtils} +import com.microsoft.hyperspace.{BuildInfo, HyperspaceException, TestUtils} import com.microsoft.hyperspace.index.IndexConstants.UNKNOWN_FILE_ID import com.microsoft.hyperspace.util.{JsonUtils, PathUtils} @@ -176,7 +176,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "kind" : "Spark" | } | }, - | "properties" : { }, + | "properties" : { + | "${IndexConstants.HYPERSPACE_VERSION_PROPERTY}" : "${BuildInfo.version}" + | }, | "version" : "0.1", | "id" : 0, | "state" : "ACTIVE", @@ -220,7 +222,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter .Properties(Seq(Signature("provider", "signatureValue"))) )) - val expected = IndexLogEntry( + val expected = IndexLogEntry.create( "indexName", CoveringIndex( CoveringIndex.Properties( diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala index 789c1de32..d773a9eac 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala @@ -142,6 +142,10 @@ class IndexLogManagerImplTest fs, new Path(path, s"$HYPERSPACE_LOG/0"), JsonUtils.toJson(getEntry("CREATING"))) + FileUtils.createFile( + fs, + new Path(path, s"$HYPERSPACE_LOG/1"), + JsonUtils.toJson(getEntry("ACTIVE"))) FileUtils.createFile( fs, new Path(path, s"$HYPERSPACE_LOG/3"), @@ -158,6 +162,8 @@ class IndexLogManagerImplTest val expected = Some(getEntry("ACTIVE")) val actual = new IndexLogManagerImpl(path).getLatestStableLog() assert(actual.equals(expected)) + val actualActiveVersions = new IndexLogManagerImpl(path).getIndexVersions(Seq("ACTIVE")) + assert(actualActiveVersions.equals(Seq(3, 1))) } test("testUpdateLatestStableLog passes if latestStable.json can be created") { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala index 6517b4629..35a04a908 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTest.scala @@ -785,7 +785,7 @@ class IndexManagerTest extends HyperspaceSuite with SQLHelper { LogicalPlanFingerprint.Properties( Seq(Signature(LogicalPlanSignatureProvider.create().name, s))))) - val entry = IndexLogEntry( + val entry = IndexLogEntry.create( indexConfig.indexName, CoveringIndex( CoveringIndex.Properties( diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexStatisticsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexStatisticsTest.scala index 545620f84..6525a6f2a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexStatisticsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexStatisticsTest.scala @@ -62,8 +62,7 @@ class IndexStatisticsTest extends QueryTest with HyperspaceSuite { } } - test( - "index() on an index refreshed in incremental or quick mode returns correct result.") { + test("index() on an index refreshed in incremental or quick mode returns correct result.") { Seq("incremental", "quick").foreach { mode => withTempPathAsString { testPath => withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala index a8364d42a..872d29ec9 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexNestedTest.scala @@ -94,14 +94,14 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Validate only index records whose lineage is the deleted file are removed. val originalIndexDF = spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + - s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") val originalIndexWithoutDeletedFile = originalIndexDF - .filter(s"""${IndexConstants.DATA_FILE_NAME_ID} != ${fileId.get}""") + .filter(s"""${IndexConstants.DATA_FILE_NAME_ID} != ${fileId.get}""") hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) val refreshedIndexDF = spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + - s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") checkAnswer(originalIndexWithoutDeletedFile, refreshedIndexDF) } @@ -111,7 +111,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate incremental refresh index (to handle deletes from the source data) " + - "fails as expected on an index without lineage.") { + "fails as expected on an index without lineage.") { SampleNestedData.save( spark, nonPartitionedDataPath, @@ -127,13 +127,13 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)) assert( ex.getMessage.contains(s"Index refresh (to handle deleted source data) is " + - "only supported on an index with lineage.")) + "only supported on an index with lineage.")) } } test( "Validate incremental refresh index is a no-op if no source data file is deleted or " + - "appended.") { + "appended.") { SampleNestedData.save( spark, nonPartitionedDataPath, @@ -153,8 +153,8 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Check emitted events. MockEventLogger.emittedEvents match { case Seq( - RefreshIncrementalActionEvent(_, _, "Operation started."), - RefreshIncrementalActionEvent(_, _, msg)) => + RefreshIncrementalActionEvent(_, _, "Operation started."), + RefreshIncrementalActionEvent(_, _, msg)) => assert(msg.contains("Refresh incremental aborted as no source data change found.")) case _ => fail() } @@ -163,7 +163,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate incremental refresh index (to handle deletes from the source data) " + - "fails as expected when all source data files are deleted.") { + "fails as expected when all source data files are deleted.") { Seq(true, false).foreach { deleteDataFolder => withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { SampleNestedData.save( @@ -184,9 +184,9 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { } else { val dataPath = new Path(nonPartitionedDataPath, "*parquet") dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .foreach(p => FileUtils.delete(p.getPath)) + .getFileSystem(new Configuration) + .globStatus(dataPath) + .foreach(p => FileUtils.delete(p.getPath)) val ex = intercept[HyperspaceException]( @@ -201,7 +201,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate incremental refresh index (to handle deletes from the source data) " + - "works well when file info for an existing source data file changes.") { + "works well when file info for an existing source data file changes.") { SampleNestedData.save( spark, nonPartitionedDataPath, @@ -230,7 +230,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { spark, IndexLogManagerFactoryImpl.create(indexPath), IndexDataManagerFactoryImpl.create(indexPath)) - .run() + .run() { // Check the index log entry after RefreshIncrementalAction. @@ -253,7 +253,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { spark, IndexLogManagerFactoryImpl.create(indexPath), IndexDataManagerFactoryImpl.create(indexPath)) - .run() + .run() { // Check non-empty deletedFiles after RefreshIncrementalAction. @@ -270,11 +270,13 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate RefreshIncrementalAction updates appended and deleted files in metadata " + - "as expected, when some file gets deleted and some appended to source data.") { + "as expected, when some file gets deleted and some appended to source data.") { withTempPathAsString { testPath => withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - SampleNestedData.save(spark, testPath, + SampleNestedData.save( + spark, + testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks", "nested")) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) @@ -287,18 +289,18 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Add some new data to source. import spark.implicits._ SampleNestedData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") - .write - .mode("append") - .parquet(testPath) + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") + .write + .mode("append") + .parquet(testPath) val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") new RefreshIncrementalAction( spark, IndexLogManagerFactoryImpl.create(indexPath), IndexDataManagerFactoryImpl.create(indexPath)) - .run() + .run() // Verify "appendedFiles" is cleared and "deletedFiles" is updated after refresh. val indexLogEntry = getLatestStableLog(indexConfig.indexName) @@ -319,12 +321,14 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate incremental refresh index when some file gets deleted and some appended to " + - "source data.") { + "source data.") { withTempPathAsString { testPath => withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { // Save test data non-partitioned. - SampleNestedData.save(spark, testPath, + SampleNestedData.save( + spark, + testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks", "nested")) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) @@ -338,11 +342,11 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Add some new data to source. import spark.implicits._ SampleNestedData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") - .write - .mode("append") - .parquet(testPath) + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") + .write + .mode("append") + .parquet(testPath) val countAfterAppend = spark.read.parquet(testPath).count() assert(countAfterDelete + 3 == countAfterAppend) @@ -351,8 +355,8 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Check if refreshed index is updated appropriately. val indexDf = spark.read - .parquet(s"$systemPath/${indexConfig.indexName}/" + - s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + .parquet(s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") assert(indexDf.count() == countAfterAppend) } @@ -362,9 +366,11 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate the configs for incremental index data is consistent with" + - "the previous version.") { + "the previous version.") { withTempPathAsString { testPath => - SampleNestedData.save(spark, testPath, + SampleNestedData.save( + spark, + testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks", "nested")) val df = spark.read.parquet(testPath) @@ -377,11 +383,11 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Add some new data to source. import spark.implicits._ SampleNestedData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") - .write - .mode("append") - .parquet(testPath) + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") + .write + .mode("append") + .parquet(testPath) withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", @@ -397,11 +403,13 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { test( "Validate RefreshQuickAction updates appended and deleted files in metadata " + - "as expected, when some file gets deleted and some appended to source data.") { + "as expected, when some file gets deleted and some appended to source data.") { withTempPathAsString { testPath => withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - SampleNestedData.save(spark, testPath, + SampleNestedData.save( + spark, + testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks", "nested")) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) @@ -414,11 +422,11 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { // Add some new data to source. import spark.implicits._ SampleNestedData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") - .write - .mode("append") - .parquet(testPath) + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks", "nested") + .write + .mode("append") + .parquet(testPath) val prevIndexLogEntry = getLatestStableLog(indexConfig.indexName) @@ -427,7 +435,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { spark, IndexLogManagerFactoryImpl.create(indexPath), IndexDataManagerFactoryImpl.create(indexPath)) - .run() + .run() val indexLogEntry = getLatestStableLog(indexConfig.indexName) val latestFiles = listFiles(testPath, getFileIdTracker(systemPath, indexConfig)).toSet @@ -443,14 +451,14 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { assert(indexLogEntry.sourceUpdate.isDefined) assert( indexLogEntry.source.plan.properties.fingerprint.properties.signatures.head.value - == expectedLatestSignature) + == expectedLatestSignature) assert(indexLogEntry.appendedFiles === expectedAppendedFiles) assert(indexLogEntry.deletedFiles === expectedDeletedFiles) // Check index data files and source data files are not updated. assert( indexLogEntry.relations.head.data.properties.content.fileInfos - === prevIndexLogEntry.relations.head.data.properties.content.fileInfos) + === prevIndexLogEntry.relations.head.data.properties.content.fileInfos) assert(indexLogEntry.content.fileInfos === prevIndexLogEntry.content.fileInfos) } } @@ -473,9 +481,9 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { val absolutePath = PathUtils.makeAbsolute(path) val fs = absolutePath.getFileSystem(new Configuration) fs.listStatus(absolutePath) - .toSeq - .filter(f => DataPathFilter.accept(f.getPath)) - .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true)) + .toSeq + .filter(f => DataPathFilter.accept(f.getPath)) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true)) } private def getLatestStableLog(indexName: String): IndexLogEntry = { @@ -490,7 +498,7 @@ class RefreshIndexNestedTest extends QueryTest with HyperspaceSuite { version: Int, allowEmpty: Boolean = false) = { val cnt = entry.content.fileInfos - .count(_.name.contains(s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version")) + .count(_.name.contains(s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version")) assert(allowEmpty || cnt > 0) cnt } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala index f49d8ca2d..95c777a18 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleSuite.scala @@ -58,7 +58,7 @@ trait HyperspaceRuleSuite extends HyperspaceSuite { new FileStatus(10, false, 1, 10, 10, path) } - val indexLogEntry = IndexLogEntry( + val indexLogEntry = IndexLogEntry.create( name, CoveringIndex( CoveringIndex.Properties( diff --git a/src/test/scala/com/microsoft/hyperspace/util/ResolverUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/util/ResolverUtilsTest.scala index 923cb5fb7..7f0ba8c4a 100644 --- a/src/test/scala/com/microsoft/hyperspace/util/ResolverUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/util/ResolverUtilsTest.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.util import org.apache.spark.SparkFunSuite import com.microsoft.hyperspace.{HyperspaceException, SparkInvolvedSuite} +import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { @@ -31,13 +32,13 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { assert( ResolverUtils .resolve(spark, Seq("id", "name"), df.queryExecution.analyzed) - .contains(Seq(("id", false), ("name", false)))) + .contains(Seq(ResolvedColumn("id", false), ResolvedColumn("name", false)))) assert( ResolverUtils.resolve(spark, Seq("unknown", "name"), df.queryExecution.analyzed).isEmpty) assert( ResolverUtils .resolve(spark, Seq.empty[String], df.queryExecution.analyzed) - .contains(Seq.empty[(String, Boolean)])) + .contains(Seq.empty[ResolvedColumn])) } test("Verify testResolve against dataframe - case sensitiveness false") { @@ -49,7 +50,7 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { assert( ResolverUtils .resolve(spark, Seq("iD", "nAme"), df.queryExecution.analyzed) - .contains(Seq(("Id", false), ("Name", false)))) + .contains(Seq(ResolvedColumn("Id", false), ResolvedColumn("Name", false)))) } test("Verify testResolve against dataframe - case sensitiveness true") { @@ -74,7 +75,7 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { assert( ResolverUtils .resolve(spark, Seq("id", "nm"), df.queryExecution.analyzed) - .contains(Seq(("id", false), ("nm", false)))) + .contains(Seq(ResolvedColumn("id", false), ResolvedColumn("nm", false)))) assert( ResolverUtils .resolve( @@ -83,10 +84,10 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { df.queryExecution.analyzed) .contains( Seq( - ("nm", false), - ("nested.n.n.n.f2", true), - ("nested.n.n.nf1_b", true), - ("nested.nf1", true)))) + ResolvedColumn("nm", false), + ResolvedColumn("nested.n.n.n.f2", true), + ResolvedColumn("nested.n.n.nf1_b", true), + ResolvedColumn("nested.nf1", true)))) } test("Verify testResolve against dataframe - unsupported nested field names") { @@ -98,7 +99,11 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { assert( ResolverUtils .resolve(spark, Seq("id", "nm", "nested.n__y"), df.queryExecution.analyzed) - .contains(Seq(("id", false), ("nm", false), ("nested.n__y", true)))) + .contains( + Seq( + ResolvedColumn("id", false), + ResolvedColumn("nm", false), + ResolvedColumn("nested.n__y", true)))) val exc = intercept[HyperspaceException] { ResolverUtils.resolve(spark, Seq("nm", "nested.`nf9.x`"), df.queryExecution.analyzed) } @@ -133,6 +138,31 @@ class ResolverUtilsTest extends SparkFunSuite with SparkInvolvedSuite { } assert(exc2.getMessage.contains("Map types are not supported.")) } + + test("Verify ResolvedColumn functionality") { + val col1 = ResolvedColumn("a.b.c", isNested = false) + assert(col1.name == "a.b.c") + assert(col1.normalizedName == "a.b.c") + assert(col1 === ResolvedColumn(col1.normalizedName)) + // toString() calls toPrettySQL() internally and this verifies we are creating the + // `Column` object correctly. + assert(col1.toColumn.toString === "`a.b.c`") + assert(col1.toNormalizedColumn.toString === "`a.b.c`") + + val col2 = ResolvedColumn("abc", isNested = false) + assert(col2.name == "abc") + assert(col2.normalizedName == "abc") + assert(col2 === ResolvedColumn(col2.normalizedName)) + assert(col2.toColumn.toString === "abc") + assert(col2.toNormalizedColumn.toString === "abc") + + val col3 = ResolvedColumn("a.b.c", isNested = true) + assert(col3.name == "a.b.c") + assert(col3.normalizedName == "__hs_nested.a.b.c") + assert(col3 === ResolvedColumn(col3.normalizedName)) + assert(col3.toColumn.toString === "a.b.c AS `__hs_nested.a.b.c`") + assert(col3.toNormalizedColumn.toString === "`__hs_nested.a.b.c`") + } } case class NType8(f1: String, maps: Map[NType, NType]) diff --git a/src/test/scala/com/microsoft/hyperspace/util/SchemaUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/util/SchemaUtilsTest.scala deleted file mode 100644 index 133e61bff..000000000 --- a/src/test/scala/com/microsoft/hyperspace/util/SchemaUtilsTest.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.util - -import org.apache.spark.SparkFunSuite - -import com.microsoft.hyperspace.SparkInvolvedSuite - -class SchemaUtilsTest extends SparkFunSuite with SparkInvolvedSuite { - - val originals = Seq[(String, Boolean)]( - ("id", false), - ("a.b.c", true), - ("__hs_nested", false), - ("__hs_nested_a", false), - ("a.__hs_nested", true), - ("a.__hs_nested.b", true), - ("a.nested..b", true), - ("a.`g.c`.b", true), - ("a.g-c.b", true), - ("a-b", false)) - val prefixed = Seq( - "id", - "__hs_nested.a.b.c", - "__hs_nested", - "__hs_nested_a", - "__hs_nested.a.__hs_nested", - "__hs_nested.a.__hs_nested.b", - "__hs_nested.a.nested..b", - "__hs_nested.a.`g.c`.b", - "__hs_nested.a.g-c.b", - "a-b") - - test("prefixNestedFieldName - default behavior") { - originals.zipWithIndex.foreach { - case (v, i) => - assert(SchemaUtils.prefixNestedFieldName(v._1) == prefixed(i)) - } - } - - test("prefixNestedFieldName - already prefixed") { - assert( - SchemaUtils.prefixNestedFieldName("__hs_nested.already.prefixed") == - "__hs_nested.already.prefixed") - } - - test("prefixNestedFieldNames") { - assert(prefixed == SchemaUtils.prefixNestedFieldNames(originals)) - } - - test("removePrefixNestedFieldName") { - prefixed.zipWithIndex.foreach { - case (v, i) => - assert(SchemaUtils.removePrefixNestedFieldName(v) == originals.toSeq(i)._1) - } - } - - test("removePrefixNestedFieldNames") { - assert(originals == SchemaUtils.removePrefixNestedFieldNames(prefixed)) - } - - test("isFieldNamePrefixed") { - val expectedBooleans1 = - Seq(false, false, false, false, false, false, false, false, false, false, false) - originals.zipWithIndex.foreach { - case (v, i) => - assert(SchemaUtils.isFieldNamePrefixed(v._1) == expectedBooleans1(i)) - } - val expectedBooleans2 = - Seq(false, true, false, false, true, true, true, true, true, false) - prefixed.zipWithIndex.foreach { - case (v, i) => - assert(SchemaUtils.isFieldNamePrefixed(v) == expectedBooleans2(i)) - } - } - - test("containsNestedFieldNames") { - assert(!SchemaUtils.containsNestedFieldNames(originals.map(_._1))) - assert(SchemaUtils.containsNestedFieldNames(prefixed)) - } -}