-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-27190: Implement col stats cache for hive iceberg table #6380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,6 @@ | |
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.calcite.linq4j.tree.Expression; | ||
| import org.apache.calcite.plan.RelOptSchema; | ||
|
|
@@ -82,7 +81,6 @@ | |
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Sets; | ||
|
|
||
| public class RelOptHiveTable implements RelOptTable { | ||
|
|
||
|
|
@@ -125,7 +123,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, RelDataTypeFactory typeFactor | |
| this.schema = calciteSchema; | ||
| this.typeFactory = typeFactory; | ||
| this.qualifiedTblName = ImmutableList.copyOf(qualifiedTblName); | ||
| this.name = this.qualifiedTblName.stream().collect(Collectors.joining(".")); | ||
| this.name = String.join(".", this.qualifiedTblName); | ||
| this.rowType = rowType; | ||
| this.hiveTblMetadata = hiveTblMetadata; | ||
| this.hiveColStatsMap = new HashMap<>(); | ||
|
|
@@ -192,15 +190,15 @@ public List<ColumnStrategy> getColumnStrategies() { | |
| public RelOptHiveTable copy(RelDataType newRowType) { | ||
| // 1. Build map of column name to col index of original schema | ||
| // Assumption: Hive Table can not contain duplicate column names | ||
| Map<String, Integer> nameToColIndxMap = new HashMap<String, Integer>(); | ||
| Map<String, Integer> nameToColIndxMap = new HashMap<>(); | ||
| for (RelDataTypeField f : this.rowType.getFieldList()) { | ||
| nameToColIndxMap.put(f.getName(), f.getIndex()); | ||
| } | ||
|
|
||
| // 2. Build nonPart/Part/Virtual column info for new RowSchema | ||
| List<ColumnInfo> newHiveNonPartitionCols = new ArrayList<ColumnInfo>(); | ||
| List<ColumnInfo> newHivePartitionCols = new ArrayList<ColumnInfo>(); | ||
| List<VirtualColumn> newHiveVirtualCols = new ArrayList<VirtualColumn>(); | ||
| List<ColumnInfo> newHiveNonPartitionCols = new ArrayList<>(); | ||
| List<ColumnInfo> newHivePartitionCols = new ArrayList<>(); | ||
| List<VirtualColumn> newHiveVirtualCols = new ArrayList<>(); | ||
| Map<Integer, VirtualColumn> virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols, | ||
| this.noOfNonVirtualCols); | ||
| Integer originalColIndx; | ||
|
|
@@ -329,8 +327,8 @@ private List<RelReferentialConstraint> generateReferentialConstraints() { | |
| ImmutableList.Builder<RelReferentialConstraint> builder = ImmutableList.builder(); | ||
| if (foreignKeyInfo != null && !foreignKeyInfo.getForeignKeys().isEmpty()) { | ||
| for (List<ForeignKeyCol> fkCols : foreignKeyInfo.getForeignKeys().values()) { | ||
| String parentDatabaseName = fkCols.get(0).parentDatabaseName; | ||
| String parentTableName = fkCols.get(0).parentTableName; | ||
| String parentDatabaseName = fkCols.getFirst().parentDatabaseName; | ||
| String parentTableName = fkCols.getFirst().parentTableName; | ||
| String qualifiedName; | ||
| List<String> parentTableQualifiedName = new ArrayList<>(); | ||
| if (parentDatabaseName != null && !parentDatabaseName.isEmpty()) { | ||
|
|
@@ -390,7 +388,7 @@ public <T> T unwrap(Class<T> arg0) { | |
|
|
||
| @Override | ||
| public List<RelCollation> getCollationList() { | ||
| ImmutableList.Builder<RelFieldCollation> collationList = new ImmutableList.Builder<RelFieldCollation>(); | ||
| ImmutableList.Builder<RelFieldCollation> collationList = new ImmutableList.Builder<>(); | ||
| for (Order sortColumn : this.hiveTblMetadata.getSortCols()) { | ||
| for (int i=0; i<this.hiveTblMetadata.getSd().getCols().size(); i++) { | ||
| FieldSchema field = this.hiveTblMetadata.getSd().getCols().get(i); | ||
|
|
@@ -411,7 +409,7 @@ public List<RelCollation> getCollationList() { | |
|
|
||
| @Override | ||
| public RelDistribution getDistribution() { | ||
| ImmutableList.Builder<Integer> columnPositions = new ImmutableList.Builder<Integer>(); | ||
| ImmutableList.Builder<Integer> columnPositions = new ImmutableList.Builder<>(); | ||
| for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) { | ||
| for (int i=0; i<this.hiveTblMetadata.getSd().getCols().size(); i++) { | ||
| FieldSchema field = this.hiveTblMetadata.getSd().getCols().get(i); | ||
|
|
@@ -435,7 +433,7 @@ public double getRowCount() { | |
| if (null == partitionList) { | ||
| // we are here either unpartitioned table or partitioned table with no | ||
| // predicates | ||
| computePartitionList(hiveConf, null, new HashSet<Integer>()); | ||
| computePartitionList(hiveConf, null, new HashSet<>()); | ||
| } | ||
| rowCount = StatsUtils.getNumRows(hiveConf, getNonPartColumns(), hiveTblMetadata, | ||
| partitionList, noColsMissingStats); | ||
|
|
@@ -465,7 +463,7 @@ private String getColNamesForLogging(Set<String> colLst) { | |
| public void computePartitionList(HiveConf conf, RexNode pruneNode, Set<Integer> partOrVirtualCols) { | ||
| try { | ||
| if (!hiveTblMetadata.isPartitioned() || pruneNode == null | ||
| || InputFinder.bits(pruneNode).length() == 0) { | ||
| || InputFinder.bits(pruneNode).isEmpty()) { | ||
| // there is no predicate on partitioning column, we need all partitions | ||
| // in this case. | ||
| partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), | ||
|
|
@@ -485,11 +483,11 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode, Set<Integer> | |
| } | ||
|
|
||
| private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) { | ||
| List<String> nonPartColNamesThatRqrStats = new ArrayList<String>(); | ||
| List<Integer> nonPartColIndxsThatRqrStats = new ArrayList<Integer>(); | ||
| List<String> partColNamesThatRqrStats = new ArrayList<String>(); | ||
| List<Integer> partColIndxsThatRqrStats = new ArrayList<Integer>(); | ||
| Set<String> colNamesFailedStats = new HashSet<String>(); | ||
| List<String> nonPartColNamesThatRqrStats = new ArrayList<>(); | ||
| List<Integer> nonPartColIndxsThatRqrStats = new ArrayList<>(); | ||
| List<String> partColNamesThatRqrStats = new ArrayList<>(); | ||
| List<Integer> partColIndxsThatRqrStats = new ArrayList<>(); | ||
| Set<String> colNamesFailedStats = new HashSet<>(); | ||
|
|
||
| // 1. Separate required columns to Non Partition and Partition Cols | ||
| ColumnInfo tmp; | ||
|
|
@@ -514,19 +512,19 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) | |
| if (null == partitionList) { | ||
| // We could be here either because its an unpartitioned table or because | ||
| // there are no pruning predicates on a partitioned table. | ||
| computePartitionList(hiveConf, null, new HashSet<Integer>()); | ||
| computePartitionList(hiveConf, null, new HashSet<>()); | ||
| } | ||
|
|
||
| String partitionListKey = partitionList.getKey().orElse(null); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was the null key check removed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i removed the redundant optional |
||
| ColumnStatsList colStatsCached = colStatsCache.get(partitionListKey); | ||
| if (colStatsCached == null) { | ||
| colStatsCached = new ColumnStatsList(); | ||
| colStatsCache.put(partitionListKey, colStatsCached); | ||
| } | ||
| String partitionListKey = partitionList.getKey(); | ||
|
|
||
| ColumnStatsList colStatsCached = colStatsCache.computeIfAbsent( | ||
| partitionListKey, | ||
| k -> new ColumnStatsList() | ||
| ); | ||
|
|
||
| // 2. Obtain Col Stats for Non Partition Cols | ||
| if (nonPartColNamesThatRqrStats.size() > 0) { | ||
| List<ColStatistics> hiveColStats = new ArrayList<ColStatistics>(); | ||
| if (!nonPartColNamesThatRqrStats.isEmpty()) { | ||
| List<ColStatistics> hiveColStats = new ArrayList<>(); | ||
|
|
||
| if (!hiveTblMetadata.isPartitioned()) { | ||
| // 2.1 Handle the case for unpartitioned table. | ||
|
|
@@ -547,9 +545,9 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) | |
| if (hiveColStats.isEmpty()) { | ||
| colNamesFailedStats.addAll(nonPartColNamesThatRqrStats); | ||
| } else if (hiveColStats.size() != nonPartColNamesThatRqrStats.size()) { | ||
| Set<String> setOfFiledCols = new HashSet<String>(nonPartColNamesThatRqrStats); | ||
| Set<String> setOfFiledCols = new HashSet<>(nonPartColNamesThatRqrStats); | ||
|
|
||
| Set<String> setOfObtainedColStats = new HashSet<String>(); | ||
| Set<String> setOfObtainedColStats = new HashSet<>(); | ||
| for (ColStatistics cs : hiveColStats) { | ||
| setOfObtainedColStats.add(cs.getColumnName()); | ||
| } | ||
|
|
@@ -561,7 +559,7 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) | |
| // nonPartColNamesThatRqrStats. reorder hiveColStats so we can build hiveColStatsMap | ||
| // using nonPartColIndxsThatRqrStats as below | ||
| Map<String, ColStatistics> columnStatsMap = | ||
| new HashMap<String, ColStatistics>(hiveColStats.size()); | ||
| new HashMap<>(hiveColStats.size()); | ||
| for (ColStatistics cs : hiveColStats) { | ||
| columnStatsMap.put(cs.getColumnName(), cs); | ||
| // even though the stats were estimated we need to warn user that | ||
|
|
@@ -586,22 +584,21 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) | |
| if (partitionList.getNotDeniedPartns().isEmpty()) { | ||
| // no need to make a metastore call | ||
| rowCount = 0; | ||
| hiveColStats = new ArrayList<ColStatistics>(); | ||
| hiveColStats = new ArrayList<>(); | ||
| for (int i = 0; i < nonPartColNamesThatRqrStats.size(); i++) { | ||
| // add empty stats object for each column | ||
| hiveColStats.add( | ||
| new ColStatistics( | ||
| nonPartColNamesThatRqrStats.get(i), | ||
| hiveNonPartitionColsMap.get(nonPartColIndxsThatRqrStats.get(i)).getTypeName())); | ||
| } | ||
| colNamesFailedStats.clear(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was it removed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's redundant, there is nothing to clear yet |
||
| colStatsCached.updateState(State.COMPLETE); | ||
| } else { | ||
| Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, | ||
| hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, | ||
| nonPartColNamesThatRqrStats, true); | ||
| rowCount = stats.getNumRows(); | ||
| hiveColStats = new ArrayList<ColStatistics>(); | ||
| hiveColStats = new ArrayList<>(); | ||
| for (String c : nonPartColNamesThatRqrStats) { | ||
| ColStatistics cs = stats.getColumnStatisticsFromColName(c); | ||
| if (cs != null) { | ||
|
|
@@ -622,7 +619,7 @@ private void updateColStats(Set<Integer> projIndxLst, boolean allowMissingStats) | |
| } | ||
| } | ||
|
|
||
| if (hiveColStats != null && hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was the null check removed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't be null, IDE also reported the same |
||
| if (hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { | ||
| for (int i = 0; i < hiveColStats.size(); i++) { | ||
| // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats | ||
| // are in same order | ||
|
|
@@ -754,7 +751,7 @@ public int hashCode() { | |
| } | ||
|
|
||
| public String getPartitionListKey() { | ||
| return partitionList != null ? partitionList.getKey().orElse(null) : null; | ||
| return partitionList != null ? partitionList.getKey() : null; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -184,8 +184,11 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, | |
| String key = tab.getFullyQualifiedName() + ";"; | ||
| if (tab.getMetaTable() != null) { | ||
| key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";"; | ||
| } else if (tab.getSnapshotRef() != null) { | ||
| key = tab.getFullyQualifiedName() + "." + tab.getSnapshotRef() + ";"; | ||
| } else if (tab.isNonNative()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we currently limit ourselves to only Iceberg tables, rather than all non-native tables?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. others would have a fallback, -1 snapshotId |
||
| long snapshotId = tab.getStorageHandler().getSnapshotId(tab); | ||
| if (snapshotId > 0) { | ||
| key = tab.getFullyQualifiedName() + "." + snapshotId + ";"; | ||
| } | ||
| } | ||
|
|
||
| if (!tab.isPartitioned()) { | ||
|
|
@@ -441,7 +444,7 @@ static private boolean hasUserFunctions(ExprNodeDesc expr) { | |
| return false; | ||
| } | ||
|
|
||
| private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, | ||
| private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, | ||
| HiveConf conf, Set<String> partColsUsedInFilter, boolean isPruningByExactFilter) | ||
| throws SemanticException { | ||
| try { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle the exception case where the table is null?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table shouldn't be null here, isn't it? if it's null - exception is expected