Skip to content

Commit

Permalink
fixing disabling col stats and index definition update
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 20, 2024
1 parent 8e746be commit 749f803
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieC
}
}

/**
* Updates the cols being indexed with column stats. This is for tracking purpose to that queries can leverage col stats from MDT only for
* indexed columns.
* @param metaClient instance of {@link HoodieTableMetaClient} of interest.
* @param columnsToIndex list of columns to index.
*/
protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import java.util.List;

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;

public class HoodieIndexClientUtils {

public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
Expand Down Expand Up @@ -61,4 +63,8 @@ public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig co
}
}
}

public static void deleteColStatsIndexDefn(HoodieTableMetaClient dataTableMetaClient) {
dataTableMetaClient.deleteIndexDefinition(PARTITION_NAME_COLUMN_STATS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.HoodieIndexClientUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -1029,6 +1030,10 @@ public void deleteMetadataIndexIfNecessary() {
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType.getPartitionPath())) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType.getPartitionPath());
}
if (partitionType == MetadataPartitionType.COLUMN_STATS) {
// delete index definition as well
HoodieIndexClientUtils.deleteColStatsIndexDefn(getMetaClient());
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
throw new HoodieException("Failed to delete metadata partition: " + partitionType.name(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public class HoodieSparkIndexClient extends BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkIndexClient.class);

private static volatile HoodieSparkIndexClient _instance;

private Option<SparkSession> sparkSessionOpt = Option.empty();
private Option<HoodieWriteConfig> writeConfigOpt = Option.empty();
private Option<HoodieEngineContext> engineContextOpt = Option.empty();
Expand Down Expand Up @@ -136,15 +134,6 @@ private SparkRDDWriteClient getWriteClient(HoodieTableMetaClient metaClient) {
}
});

/*sparkSessionOpt.get().sqlContext().getAllConfs().foreach(new Function1<Tuple2<String, String>, Void>() {
@Override
public Void apply(Tuple2<String, String> v1) {
if (v1._1.startsWith("hoodie.")) {
typedProperties.put(v1._1, v1._2);
}
return Void;
});*/

HoodieWriteConfig localWriteConfig = HoodieWriteConfig.newBuilder()
.withPath(metaClient.getBasePath())
.withProperties(typedProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.functions.{lit, struct, typedLit}
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.dsl.expressions.StringToAttributeConversionHelper
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.provider.Arguments

Expand Down Expand Up @@ -234,6 +234,10 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
assertEquals(expectedColsToIndex, indexDefn.getSourceFields.asScala.toSeq)
}

protected def validateNonExistantColumnsToIndexDefn(metaClient: HoodieTableMetaClient): Unit = {
assertTrue(!metaClient.getIndexMetadata.get().getIndexDefinitions.containsKey(PARTITION_NAME_COLUMN_STATS))
}

protected def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
metadataOpts: Map[String, String],
expectedColStatsSourcePath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase {

validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c7","c8"))

// update list of columns to explicit list of cols.
val metadataOpts3 = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false",
HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key() -> "c1,c2,c3,c5,c7" // ignore c4,c5,c8.
)
// disable col stats
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts3, commonOpts,
dataSourcePath = "index/colstats/update6-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append,
shouldValidate = false,
shouldValidateManually = false))

metaClient = HoodieTableMetaClient.reload(metaClient)
validateNonExistantColumnsToIndexDefn(metaClient)
}

/**
Expand Down

0 comments on commit 749f803

Please sign in to comment.