From be857bf83c00e41a315a7cc8e013832cf169ef18 Mon Sep 17 00:00:00 2001 From: Krisztian Kasa Date: Tue, 9 Apr 2024 14:58:52 +0200 Subject: [PATCH] HIVE-28126: Use added record count in cost model when rebuilding materialized view stored by iceberg (Krisztian Kasa, reviewed by Denys Kuzmenko, okumin) --- .../mr/hive/HiveIcebergStorageHandler.java | 70 ++- .../hive/TestHiveIcebergStorageHandler.java | 59 ++ .../test/queries/positive/mv_iceberg_orc4.q | 19 +- .../results/positive/mv_iceberg_orc4.q.out | 572 +++++++++++++++++- .../results/positive/mv_iceberg_orc7.q.out | 2 +- .../formatter/TextDescTableFormatter.java | 10 +- .../AlterMaterializedViewRebuildAnalyzer.java | 41 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 53 +- .../hive/ql/metadata/HiveStorageHandler.java | 26 + .../stats/HiveIncrementalRelMdRowCount.java | 10 + .../materialized_view_create_rewrite_7.q.out | 2 +- ...terialized_view_create_rewrite_nulls.q.out | 2 +- .../hive/common/type/SnapshotContext.java | 36 ++ 13 files changed, 845 insertions(+), 57 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 8aa833c4c18c..369bc91fbcc4 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -137,6 +137,7 @@ import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataOperations; import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -1618,7 +1619,58 @@ public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metad if (current == null) { return null; } - return new SnapshotContext(current.snapshotId()); + return toSnapshotContext(current); + } + + private SnapshotContext toSnapshotContext(Snapshot snapshot) { + Map summaryMap = snapshot.summary(); + long addedRecords = getLongSummary(summaryMap, SnapshotSummary.ADDED_RECORDS_PROP); + long deletedRecords = getLongSummary(summaryMap, SnapshotSummary.DELETED_RECORDS_PROP); + return new SnapshotContext( + snapshot.snapshotId(), toWriteOperationType(snapshot.operation()), addedRecords, deletedRecords); + } + + private SnapshotContext.WriteOperationType toWriteOperationType(String operation) { + try { + return SnapshotContext.WriteOperationType.valueOf(operation.toUpperCase()); + } catch (NullPointerException | IllegalArgumentException ex) { + return SnapshotContext.WriteOperationType.UNKNOWN; + } + } + + private long getLongSummary(Map summaryMap, String key) { + String textValue = summaryMap.get(key); + if (StringUtils.isBlank(textValue)) { + return 0; + } + return Long.parseLong(textValue); + } + + @Override + public Iterable getSnapshotContexts( + org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) { + + TableDesc tableDesc = Utilities.getTableDesc(hmsTable); + Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties()); + return getSnapshots(table.snapshots(), since); + } + + @VisibleForTesting + Iterable getSnapshots(Iterable snapshots, SnapshotContext since) { + List result = Lists.newArrayList(); + + boolean foundSince = Objects.isNull(since); + for (Snapshot snapshot : snapshots) { + if (!foundSince) { + if (snapshot.snapshotId() == since.getSnapshotId()) { + foundSince = true; + } + } else { + result.add(toSnapshotContext(snapshot)); + } + } + + return foundSince ? result : Collections.emptyList(); } @Override @@ -1692,6 +1744,20 @@ public void addResourcesForCreateTable(Map tblProps, HiveConf hi } } + /** + * Check the operation type of all snapshots which are newer than the specified. The specified snapshot is excluded. + * @deprecated + *
Use {@link HiveStorageHandler#getSnapshotContexts( + * org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since)} + * and check {@link SnapshotContext.WriteOperationType#APPEND}.equals({@link SnapshotContext#getOperation()}). + * + * @param hmsTable table metadata stored in Hive Metastore + * @param since the snapshot preceding the oldest snapshot which should be checked. + * The value null means all should be checked. + * @return null if table is empty, true if all snapshots are {@link SnapshotContext.WriteOperationType#APPEND}s, + * false otherwise. + */ + @Deprecated @Override public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) { TableDesc tableDesc = Utilities.getTableDesc(hmsTable); @@ -1708,7 +1774,7 @@ Boolean hasAppendsOnly(Iterable snapshots, SnapshotContext since) { foundSince = true; } } else { - if (!"append".equals(snapshot.operation())) { + if (!DataOperations.APPEND.equals(snapshot.operation())) { return false; } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java index 92697a4649dd..1a2eddcf5f1b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java @@ -22,8 +22,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import org.apache.commons.collections4.IterableUtils; import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,10 +55,18 @@ public class TestHiveIcebergStorageHandler { @Before public void before() { when(anySnapshot.snapshotId()).thenReturn(42L); + Mockito.lenient().when(appendSnapshot.snapshotId()).thenReturn(20L); + Map summary = Maps.newHashMap(); + summary.put(SnapshotSummary.ADDED_RECORDS_PROP, "12"); + Mockito.lenient().when(appendSnapshot.summary()).thenReturn(summary); when(appendSnapshot.operation()).thenReturn("append"); + Mockito.lenient().when(deleteSnapshot.snapshotId()).thenReturn(100L); when(deleteSnapshot.operation()).thenReturn("delete"); + summary = Maps.newHashMap(); + summary.put(SnapshotSummary.DELETED_RECORDS_PROP, "3"); + Mockito.lenient().when(deleteSnapshot.summary()).thenReturn(summary); } @Test @@ -123,4 +135,51 @@ public void testHasAppendsOnlyReturnsNullWhenGivenSnapshotNotInTheList() { assertThat(result, is(nullValue())); } + + @Test + public void testGetSnapshotContextsReturnsEmptyIterableWhenTableIsEmpty() { + SnapshotContext since = new SnapshotContext(42); + + HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler(); + Iterable result = storageHandler.getSnapshots(Collections.emptyList(), since); + + assertThat(result.iterator().hasNext(), is(false)); + } + + @Test + public void testGetSnapshotContextsReturnsEmptyIterableWhenTableIsEmptyAndGivenSnapShotIsNull() { + HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler(); + Iterable result = storageHandler.getSnapshots(Collections.emptyList(), null); + + assertThat(result.iterator().hasNext(), is(false)); + } + + @Test + public void testGetSnapshotContextsReturnsAllSnapshotsWhenGivenSnapshotIsNull() { + HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler(); + Iterable result = storageHandler.getSnapshots(asList(appendSnapshot, deleteSnapshot), null); + + List resultList = IterableUtils.toList(result); + assertThat(resultList.size(), is(2)); + assertThat(resultList.get(0).getSnapshotId(), is(appendSnapshot.snapshotId())); + assertThat(resultList.get(0).getOperation(), is(SnapshotContext.WriteOperationType.APPEND)); + assertThat(resultList.get(0).getAddedRowCount(), is(12L)); + assertThat(resultList.get(0).getDeletedRowCount(), is(0L)); + + assertThat(resultList.get(1).getSnapshotId(), is(deleteSnapshot.snapshotId())); + assertThat(resultList.get(1).getOperation(), is(SnapshotContext.WriteOperationType.DELETE)); + assertThat(resultList.get(1).getAddedRowCount(), is(0L)); + assertThat(resultList.get(1).getDeletedRowCount(), is(3L)); + } + + @Test + public void testGetSnapshotContextsReturnsEmptyIterableWhenGivenSnapshotNotInTheList() { + SnapshotContext since = new SnapshotContext(1); + List snapshotList = Arrays.asList(anySnapshot, appendSnapshot, deleteSnapshot); + + HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler(); + Iterable result = storageHandler.getSnapshots(snapshotList, since); + + assertThat(result.iterator().hasNext(), is(false)); + } } diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q index 494ea821a71c..aea903996b1f 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q @@ -1,7 +1,9 @@ -- MV source tables are iceberg and MV has aggregate -- SORT_QUERY_RESULTS --! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/ +--! qt:replace:/(\s+Version\sinterval\sfrom\:\s+)\d+(\s*)/$1#Masked#/ +set hive.explain.user=false; set hive.support.concurrency=true; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; @@ -13,7 +15,7 @@ create external table tbl_ice_v2(d int, e string, f int) stored by iceberg store insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54); insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54); -create materialized view mat1 as +create materialized view mat1 stored by iceberg stored as orc tblproperties ('format-version'='2') as select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f) from tbl_ice join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52 @@ -23,8 +25,23 @@ group by tbl_ice.b, tbl_ice.c; insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54); insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54); +-- Incremental rebuild plan is more expensive than full rebuild +set hive.materializedview.rebuild.incremental.factor=10.0; + explain cbo alter materialized view mat1 rebuild; +explain +alter materialized view mat1 rebuild; + + +-- Incremental rebuild plan is cheaper than full rebuild +set hive.materializedview.rebuild.incremental.factor=0.01; + +explain cbo +alter materialized view mat1 rebuild; +explain +alter materialized view mat1 rebuild; + alter materialized view mat1 rebuild; select * from mat1; diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out index 0f0583e52d42..0f52fa18f33c 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out @@ -36,7 +36,7 @@ POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@tbl_ice_v2 -PREHOOK: query: create materialized view mat1 as +PREHOOK: query: create materialized view mat1 stored by iceberg stored as orc tblproperties ('format-version'='2') as select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f) from tbl_ice join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52 @@ -46,7 +46,8 @@ PREHOOK: Input: default@tbl_ice PREHOOK: Input: default@tbl_ice_v2 PREHOOK: Output: database:default PREHOOK: Output: default@mat1 -POSTHOOK: query: create materialized view mat1 as +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: create materialized view mat1 stored by iceberg stored as orc tblproperties ('format-version'='2') as select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f) from tbl_ice join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52 @@ -56,6 +57,7 @@ POSTHOOK: Input: default@tbl_ice POSTHOOK: Input: default@tbl_ice_v2 POSTHOOK: Output: database:default POSTHOOK: Output: default@mat1 +POSTHOOK: Output: hdfs://### HDFS PATH ### POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ] POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ] POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ] @@ -105,21 +107,581 @@ HiveAggregate(group=[{0, 1}], agg#0=[sum($2)]) HiveProject(b=[$0], c=[$1], _c2=[$2]) HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) +PREHOOK: query: explain +alter materialized view mat1 rebuild +PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +PREHOOK: Input: default@mat1 +PREHOOK: Input: default@tbl_ice +PREHOOK: Input: default@tbl_ice_v2 +PREHOOK: Output: default@mat1 +POSTHOOK: query: explain +alter materialized view mat1 rebuild +POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +POSTHOOK: Input: default@mat1 +POSTHOOK: Input: default@tbl_ice +POSTHOOK: Input: default@tbl_ice_v2 +POSTHOOK: Output: default@mat1 +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-4 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 8 <- Union 4 (CONTAINS) + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Union 4 (CONTAINS) + Reducer 5 <- Union 4 (SIMPLE_EDGE) + Reducer 6 <- Reducer 5 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_ice + filterExpr: ((c > 52) and a is not null) (type: boolean) + Statistics: Num rows: 8 Data size: 768 Basic stats: COMPLETE Column stats: COMPLETE + Version interval from: #Masked# + Filter Operator + predicate: ((c > 52) and a is not null) (type: boolean) + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: a (type: int), b (type: string), c (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string), _col2 (type: int) + Execution mode: vectorized + Map 7 + Map Operator Tree: + TableScan + alias: tbl_ice_v2 + filterExpr: d is not null (type: boolean) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Version interval from: #Masked# + Filter Operator + predicate: d is not null (type: boolean) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: d (type: int), f (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: int) + Execution mode: vectorized + Map 8 + Map Operator Tree: + TableScan + alias: default.mat1 + Statistics: Num rows: 2 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: b (type: string), c (type: int), _c2 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col2) + keys: _col0 (type: string), _col1 (type: int) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint) + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1, _col2, _col4 + Statistics: Num rows: 6 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col4) + keys: _col1 (type: string), _col2 (type: int) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint) + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col2) + keys: _col0 (type: string), _col1 (type: int) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint) + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + Select Operator + expressions: _col0 (type: string), _col1 (type: int), _col2 (type: bigint) + outputColumnNames: b, c, _c2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: max(length(b)), avg(COALESCE(length(b),0)), count(1), count(b), compute_bit_vector_hll(b), min(c), max(c), count(c), compute_bit_vector_hll(c), min(_c2), max(_c2), count(_c2), compute_bit_vector_hll(_c2) + minReductionHashAggr: 0.75 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: struct), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: bigint), _col10 (type: bigint), _col11 (type: bigint), _col12 (type: binary) + Reducer 6 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), _col9 (type: bigint), _col10 (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 4 + Vertex: Union 4 + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + Column Stats Desc: + Columns: b, c, _c2 + Column Types: string, int, bigint + Table: default.mat1 + + Stage: Stage-4 + Materialized View Update + name: default.mat1 + update creation metadata: true + +PREHOOK: query: explain cbo +alter materialized view mat1 rebuild +PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +PREHOOK: Input: default@mat1 +PREHOOK: Input: default@tbl_ice +PREHOOK: Input: default@tbl_ice_v2 +PREHOOK: Output: default@mat1 +PREHOOK: Output: default@mat1 +POSTHOOK: query: explain cbo +alter materialized view mat1 rebuild +POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +POSTHOOK: Input: default@mat1 +POSTHOOK: Input: default@tbl_ice +POSTHOOK: Input: default@tbl_ice_v2 +POSTHOOK: Output: default@mat1 +POSTHOOK: Output: default@mat1 +CBO PLAN: +HiveProject($f0=[$4], $f1=[$5], $f2=[CASE(IS NULL($2), $6, IS NULL($6), $2, +($6, $2))]) + HiveFilter(condition=[OR($3, IS NULL($3))]) + HiveJoin(condition=[AND(IS NOT DISTINCT FROM($0, $4), IS NOT DISTINCT FROM($1, $5))], joinType=[right], algorithm=[BucketJoin], cost=[not available]) + HiveProject(b=[$0], c=[$1], _c2=[$2], $f3=[true]) + HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) + HiveProject(b=[$0], c=[$1], $f2=[$2]) + HiveAggregate(group=[{1, 2}], agg#0=[sum($4)]) + HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[CommonJoin], cost=[not available]) + HiveProject(a=[$0], b=[$1], c=[$2]) + HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))]) + HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#]) + HiveProject(d=[$0], f=[$2]) + HiveFilter(condition=[IS NOT NULL($0)]) + HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2], fromVersion=[#Masked#]) + +PREHOOK: query: explain +alter materialized view mat1 rebuild +PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +PREHOOK: Input: default@mat1 +PREHOOK: Input: default@tbl_ice +PREHOOK: Input: default@tbl_ice_v2 +PREHOOK: Output: default@mat1 +PREHOOK: Output: default@mat1 +POSTHOOK: query: explain +alter materialized view mat1 rebuild +POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD +POSTHOOK: Input: default@mat1 +POSTHOOK: Input: default@tbl_ice +POSTHOOK: Input: default@tbl_ice_v2 +POSTHOOK: Output: default@mat1 +POSTHOOK: Output: default@mat1 +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-4 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-4 + Stage-5 depends on stages: Stage-0 + Stage-6 depends on stages: Stage-5 + +STAGE PLANS: + Stage: Stage-3 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) + Reducer 4 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 5 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 6 (SIMPLE_EDGE), Map 9 (SIMPLE_EDGE) + Reducer 8 <- Reducer 7 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: default.mat1 + Statistics: Num rows: 2 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: b (type: string), c (type: int), _c2 (type: bigint), true (type: boolean), PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7 + Statistics: Num rows: 2 Data size: 616 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 2 Data size: 616 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint), _col3 (type: boolean), _col4 (type: int), _col5 (type: bigint), _col6 (type: string), _col7 (type: bigint) + Execution mode: vectorized + Map 6 + Map Operator Tree: + TableScan + alias: tbl_ice + filterExpr: ((c > 52) and a is not null) (type: boolean) + Statistics: Num rows: 8 Data size: 768 Basic stats: COMPLETE Column stats: COMPLETE + Version interval from: #Masked# + Filter Operator + predicate: ((c > 52) and a is not null) (type: boolean) + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: a (type: int), b (type: string), c (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string), _col2 (type: int) + Execution mode: vectorized + Map 9 + Map Operator Tree: + TableScan + alias: tbl_ice_v2 + filterExpr: d is not null (type: boolean) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Version interval from: #Masked# + Filter Operator + predicate: d is not null (type: boolean) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: d (type: int), f (type: int) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: int) + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Merge Join Operator + condition map: + Right Outer Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: int) + 1 _col0 (type: string), _col1 (type: int) + nullSafes: [true, true] + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10 + Statistics: Num rows: 6 Data size: 1524 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: _col3 (type: boolean) + Statistics: Num rows: 1 Data size: 408 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col4 (type: int), _col5 (type: bigint), _col6 (type: string), _col7 (type: bigint), _col0 (type: string), _col1 (type: int), _col2 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 1 Data size: 304 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint) + null sort order: aaaa + sort order: ++++ + Statistics: Num rows: 1 Data size: 304 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col4 (type: string), _col5 (type: int), _col6 (type: bigint) + Filter Operator + predicate: _col3 (type: boolean) + Statistics: Num rows: 1 Data size: 408 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col8 (type: string), _col9 (type: int), CASE WHEN (_col2 is null) THEN (_col10) WHEN (_col10 is null) THEN (_col2) ELSE ((_col10 + _col2)) END (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + Select Operator + expressions: _col0 (type: string), _col1 (type: int), _col2 (type: bigint) + outputColumnNames: b, c, _c2 + Statistics: Num rows: 1 Data size: 100 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: max(length(b)), avg(COALESCE(length(b),0)), count(1), count(b), compute_bit_vector_hll(b), min(c), max(c), count(c), compute_bit_vector_hll(c), min(_c2), max(_c2), count(_c2), compute_bit_vector_hll(_c2) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: struct), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: bigint), _col10 (type: bigint), _col11 (type: bigint), _col12 (type: binary) + Filter Operator + predicate: _col3 is null (type: boolean) + Statistics: Num rows: 4 Data size: 1016 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col8 (type: string), _col9 (type: int), CASE WHEN (_col2 is null) THEN (_col10) WHEN (_col10 is null) THEN (_col2) ELSE ((_col10 + _col2)) END (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + Select Operator + expressions: _col0 (type: string), _col1 (type: int), _col2 (type: bigint) + outputColumnNames: b, c, _c2 + Statistics: Num rows: 4 Data size: 384 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: max(length(b)), avg(COALESCE(length(b),0)), count(1), count(b), compute_bit_vector_hll(b), min(c), max(c), count(c), compute_bit_vector_hll(c), min(_c2), max(_c2), count(_c2), compute_bit_vector_hll(_c2) + minReductionHashAggr: 0.75 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 568 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: int), _col1 (type: struct), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: int), _col7 (type: bigint), _col8 (type: binary), _col9 (type: bigint), _col10 (type: bigint), _col11 (type: bigint), _col12 (type: binary) + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: string), VALUE._col1 (type: int), VALUE._col2 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 1 Data size: 304 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 304 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + Reducer 4 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), _col9 (type: bigint), _col10 (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0), avg(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), min(VALUE._col5), max(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'STRING' (type: string), UDFToLong(COALESCE(_col0,0)) (type: bigint), COALESCE(_col1,0) (type: double), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'LONG' (type: string), UDFToLong(_col5) (type: bigint), UDFToLong(_col6) (type: bigint), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), _col9 (type: bigint), _col10 (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 794 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 7 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1, _col2, _col4 + Statistics: Num rows: 6 Data size: 576 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col4) + keys: _col1 (type: string), _col2 (type: int) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint) + Reducer 8 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string), KEY._col1 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: int) + Statistics: Num rows: 4 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: bigint) + + Stage: Stage-4 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.mat1 + + Stage: Stage-5 + Stats Work + Basic Stats Work: + Column Stats Desc: + Columns: b, c, _c2 + Column Types: string, int, bigint + Table: default.mat1 + + Stage: Stage-6 + Materialized View Update + name: default.mat1 + update creation metadata: true + PREHOOK: query: alter materialized view mat1 rebuild PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD PREHOOK: Input: default@mat1 PREHOOK: Input: default@tbl_ice PREHOOK: Input: default@tbl_ice_v2 PREHOOK: Output: default@mat1 +PREHOOK: Output: default@mat1 POSTHOOK: query: alter materialized view mat1 rebuild POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD POSTHOOK: Input: default@mat1 POSTHOOK: Input: default@tbl_ice POSTHOOK: Input: default@tbl_ice_v2 POSTHOOK: Output: default@mat1 -POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:_c2, type:bigint, comment:null), ] -POSTHOOK: Lineage: mat1.b EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), (mat1)default.mat1.FieldSchema(name:b, type:string, comment:null), ] -POSTHOOK: Lineage: mat1.c EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:c, type:int, comment:null), ] +POSTHOOK: Output: default@mat1 PREHOOK: query: select * from mat1 PREHOOK: type: QUERY PREHOOK: Input: default@mat1 diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out index dbbdd39e35ea..93e56b61b96b 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out @@ -76,7 +76,7 @@ POSTHOOK: Output: default@mat1 CBO PLAN: HiveProject($f0=[$3], $f1=[CASE(IS NULL($1), $4, IS NULL($4), $1, +($4, $1))]) HiveFilter(condition=[OR($2, IS NULL($2))]) - HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[none], cost=[not available]) + HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[BucketJoin], cost=[not available]) HiveProject(a=[$0], _c1=[$1], $f2=[true]) HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) HiveProject(a=[$0], $f1=[$1]) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java index badd90953adb..83dd37bd70e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TableType; @@ -66,7 +67,6 @@ import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.Map.Entry; @@ -284,7 +284,13 @@ private static void getMaterializedViewInfo(StringBuilder tableInfo, Table table private static MaterializationSnapshotFormatter createMaterializationSnapshotFormatter( MaterializationSnapshot snapshot) { if (snapshot != null && snapshot.getTableSnapshots() != null && !snapshot.getTableSnapshots().isEmpty()) { - return qualifiedTableName -> Objects.toString(snapshot.getTableSnapshots().get(qualifiedTableName), "Unknown"); + return qualifiedTableName -> { + SnapshotContext snapshotContext = snapshot.getTableSnapshots().get(qualifiedTableName); + if (snapshotContext == null) { + return "Unknown"; + } + return String.format("SnapshotContext{snapshotId=%d}", snapshotContext.getSnapshotId()); + }; } else if (snapshot != null && snapshot.getValidTxnList() != null) { ValidTxnWriteIdList validReaderWriteIdList = new ValidTxnWriteIdList(snapshot.getValidTxnList()); return qualifiedTableName -> { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java index 2c1f148d3037..3c6d973b1a72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveAugmentSnapshotMaterializationRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveInsertOnlyScanWriteIdRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertIncrementalRewritingRule; -import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializationRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewUtils; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HivePushdownSnapshotFilterRule; @@ -360,13 +359,15 @@ private RelNode applyRecordIncrementalRebuildPlan( } if (visitor.isContainsAggregate()) { - return applyAggregateInsertIncremental(basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan); + return applyAggregateInsertIncremental( + basePlan, mdProvider, executorProvider, optCluster, materialization, calcitePreMVRewritingPlan); } else { return applyJoinInsertIncremental(basePlan, mdProvider, executorProvider); } case AVAILABLE: if (!materialization.isSourceTablesUpdateDeleteModified()) { - return applyAggregateInsertIncremental(basePlan, mdProvider, executorProvider, optCluster, calcitePreMVRewritingPlan); + return applyAggregateInsertIncremental( + basePlan, mdProvider, executorProvider, optCluster, materialization, calcitePreMVRewritingPlan); } else { return applyAggregateInsertDeleteIncremental(basePlan, mdProvider, executorProvider); } @@ -391,30 +392,28 @@ private RelNode applyAggregateInsertDeleteIncremental( private RelNode applyAggregateInsertIncremental( RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider, RelOptCluster optCluster, - RelNode calcitePreMVRewritingPlan) { + HiveRelOptMaterialization materialization, RelNode calcitePreMVRewritingPlan) { mvRebuildMode = MaterializationRebuildMode.AGGREGATE_INSERT_REBUILD; - basePlan = applyIncrementalRebuild(basePlan, mdProvider, executorProvider, + RelNode incrementalRebuildPlan = applyIncrementalRebuild(basePlan, mdProvider, executorProvider, HiveInsertOnlyScanWriteIdRule.INSTANCE, HiveAggregateInsertIncrementalRewritingRule.INSTANCE); // Make a cost-based decision factoring the configuration property - optCluster.invalidateMetadataQuery(); - RelMetadataQuery.THREAD_PROVIDERS.set(HiveMaterializationRelMetadataProvider.DEFAULT); - try { - RelMetadataQuery mq = RelMetadataQuery.instance(); - RelOptCost costOriginalPlan = mq.getCumulativeCost(calcitePreMVRewritingPlan); - final double factorSelectivity = HiveConf.getFloatVar( - conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR); - RelOptCost costRebuildPlan = mq.getCumulativeCost(basePlan).multiplyBy(factorSelectivity); - if (costOriginalPlan.isLe(costRebuildPlan)) { - mvRebuildMode = MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD; - return calcitePreMVRewritingPlan; - } + RelOptCost costOriginalPlan = calculateCost( + optCluster, mdProvider, HiveTezModelRelMetadataProvider.DEFAULT, calcitePreMVRewritingPlan); - return basePlan; - } finally { - optCluster.invalidateMetadataQuery(); - RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider)); + RelOptCost costIncrementalRebuildPlan = calculateCost(optCluster, mdProvider, + HiveIncrementalRelMdRowCount.createMetadataProvider(materialization), incrementalRebuildPlan); + + final double factorSelectivity = HiveConf.getFloatVar( + conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR); + costIncrementalRebuildPlan = costIncrementalRebuildPlan.multiplyBy(factorSelectivity); + + if (costOriginalPlan.isLe(costIncrementalRebuildPlan)) { + mvRebuildMode = MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD; + return calcitePreMVRewritingPlan; } + + return incrementalRebuildPlan; } private RelNode applyJoinInsertIncremental( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f44fb8f7f683..3ee27912e9e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -2156,38 +2157,44 @@ private Materialization getMaterializationInvalidationInfo(MaterializedViewMetad // Mixing native and non-native acid source tables are not supported. If the first source is native acid // the rest is expected to be native acid return getMSC().getMaterializationInvalidationInfo( - metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY)); + metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY)); } } - MaterializationSnapshot mvSnapshot = MaterializationSnapshot.fromJson(metadata.creationMetadata.getValidTxnList()); + boolean allHasAppendsOnly = allTablesHasAppendsOnly(metadata); + Materialization materialization = new Materialization(); + // TODO: delete operations are not supported yet. + // Set setSourceTablesCompacted to false when delete is supported + materialization.setSourceTablesCompacted(!allHasAppendsOnly); + materialization.setSourceTablesUpdateDeleteModified(!allHasAppendsOnly); + return materialization; + } - boolean hasAppendsOnly = true; + private boolean allTablesHasAppendsOnly(MaterializedViewMetadata metadata) throws HiveException { + MaterializationSnapshot mvSnapshot = MaterializationSnapshot.fromJson(metadata.creationMetadata.getValidTxnList()); for (SourceTable sourceTable : metadata.getSourceTables()) { Table table = getTable(sourceTable.getTable().getDbName(), sourceTable.getTable().getTableName()); HiveStorageHandler storageHandler = table.getStorageHandler(); - if (storageHandler == null) { - Materialization materialization = new Materialization(); - materialization.setSourceTablesCompacted(true); - return materialization; - } - Boolean b = storageHandler.hasAppendsOnly( - table, mvSnapshot.getTableSnapshots().get(table.getFullyQualifiedName())); - if (b == null) { - Materialization materialization = new Materialization(); - materialization.setSourceTablesCompacted(true); - return materialization; - } else if (!b) { - hasAppendsOnly = false; - break; + // Currently mixing native and non-native source tables are not supported. + if (!(storageHandler != null && + storageHandler.areSnapshotsSupported() && + tableHasAppendsOnly(storageHandler, table, mvSnapshot))) { + return false; } } - Materialization materialization = new Materialization(); - // TODO: delete operations are not supported yet. - // Set setSourceTablesCompacted to false when delete is supported - materialization.setSourceTablesCompacted(!hasAppendsOnly); - materialization.setSourceTablesUpdateDeleteModified(!hasAppendsOnly); - return materialization; + + return true; + } + + private boolean tableHasAppendsOnly( + HiveStorageHandler storageHandler, Table table, MaterializationSnapshot mvSnapshot) { + for (SnapshotContext snapshot : storageHandler.getSnapshotContexts( + table, mvSnapshot.getTableSnapshots().get(table.getFullyQualifiedName()))) { + if (!SnapshotContext.WriteOperationType.APPEND.equals(snapshot.getOperation())) { + return false; + } + } + return true; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index a2c476c9ad54..87f0afe50b24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -689,6 +689,20 @@ default SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.meta return null; } + /** + * Return snapshot metadata of table snapshots which are newer than the specified. + * The specified snapshot is excluded. + * @param hmsTable table metadata stored in Hive Metastore + * @param since the snapshot preceding the oldest snapshot which should be checked. + * The value null means all should be checked. + * @return Iterable of {@link SnapshotContext}. + */ + default Iterable getSnapshotContexts( + org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) { + return Collections.emptyList(); + } + + /** * Alter table operations can rely on this to customize the EnvironmentContext to be used during the alter table * invocation (both on client and server side of HMS) @@ -699,6 +713,18 @@ default void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTab EnvironmentContext environmentContext) { } + /** + * Check the operation type of all snapshots which are newer than the specified. The specified snapshot is excluded. + * @deprecated + *
Use {@link HiveStorageHandler#getSnapshotContexts(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since)} + * and check {@link SnapshotContext.WriteOperationType#APPEND}.equals({@link SnapshotContext#getOperation()}). + * + * @param hmsTable table metadata stored in Hive Metastore + * @param since the snapshot preceding the oldest snapshot which should be checked. + * The value null means all should be checked. + * @return null if table is empty, true if all snapshots are {@link SnapshotContext.WriteOperationType#APPEND}s, false otherwise. + */ + @Deprecated default Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveIncrementalRelMdRowCount.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveIncrementalRelMdRowCount.java index bc2427eefcd0..9527431aaf78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveIncrementalRelMdRowCount.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveIncrementalRelMdRowCount.java @@ -27,8 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.type.SnapshotContext; import org.apache.hadoop.hive.metastore.api.SourceTable; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.MaterializedViewMetadata; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTezModelRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -36,6 +38,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.stream.StreamSupport; public class HiveIncrementalRelMdRowCount extends HiveRelMdRowCount { @@ -84,6 +87,13 @@ public Double getRowCount(TableScan rel, RelMetadataQuery mq) { return super.getRowCount(rel, mq); } + HiveStorageHandler storageHandler = table.getStorageHandler(); + if (storageHandler != null && storageHandler.areSnapshotsSupported()) { + SnapshotContext since = new SnapshotContext(Long.parseLong(table.getVersionIntervalFrom())); + return StreamSupport.stream(storageHandler.getSnapshotContexts(table, since).spliterator(), false) + .mapToDouble(SnapshotContext::getAddedRowCount).sum(); + } + return (double) sourceTable.getInsertedCount(); } } diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_7.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_7.q.out index 000f07b5a1d2..80d4fc63f0a0 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_7.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_7.q.out @@ -102,7 +102,7 @@ POSTHOOK: Output: default@mat1 CBO PLAN: HiveProject($f0=[$3], $f1=[CASE(IS NULL($1), $4, IS NULL($4), $1, +($4, $1))]) HiveFilter(condition=[OR($2, IS NULL($2))]) - HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[none], cost=[not available]) + HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[BucketJoin], cost=[not available]) HiveProject(a=[$0], _c1=[$1], $f2=[true]) HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) HiveProject(a=[$0], $f1=[$1]) diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_nulls.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_nulls.q.out index 53978961bfaa..93a9cb52e2d7 100644 --- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_nulls.q.out +++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_nulls.q.out @@ -118,7 +118,7 @@ Explain CBO PLAN: HiveProject($f0=[$6], $f1=[$7], $f2=[CASE(IS NULL($2), $8, IS NULL($8), $2, +($8, $2))], $f3=[CASE(IS NULL($3), $9, IS NULL($9), $3, CASE(<($9, $3), $9, $3))], $f4=[CASE(IS NULL($4), $10, IS NULL($10), $4, CASE(>($10, $4), $10, $4))]) HiveFilter(condition=[OR($5, IS NULL($5))]) - HiveJoin(condition=[AND(IS NOT DISTINCT FROM($0, $6), IS NOT DISTINCT FROM($1, $7))], joinType=[right], algorithm=[none], cost=[not available]) + HiveJoin(condition=[AND(IS NOT DISTINCT FROM($0, $6), IS NOT DISTINCT FROM($1, $7))], joinType=[right], algorithm=[BucketJoin], cost=[not available]) HiveProject(a=[$0], b=[$1], _c2=[$2], _c3=[$3], _c4=[$4], $f5=[true]) HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1]) HiveProject(a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4]) diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java index 7d2080fb917d..59ff05e1b8aa 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java @@ -27,7 +27,18 @@ * materialized view on Iceberg tables. */ public class SnapshotContext { + public enum WriteOperationType { + APPEND, + REPLACE, + OVERWRITE, + DELETE, + UNKNOWN + } + private long snapshotId; + private WriteOperationType operation; + private long addedRowCount; + private long deletedRowCount; /** * Constructor for json serialization @@ -37,6 +48,16 @@ private SnapshotContext() { public SnapshotContext(long snapshotId) { this.snapshotId = snapshotId; + this.operation = null; + this.addedRowCount = 0; + this.deletedRowCount = 0; + } + + public SnapshotContext(long snapshotId, WriteOperationType operation, long addedRowCount, long deletedRowCount) { + this.snapshotId = snapshotId; + this.operation = operation; + this.addedRowCount = addedRowCount; + this.deletedRowCount = deletedRowCount; } /** @@ -64,10 +85,25 @@ public int hashCode() { return Objects.hash(snapshotId); } + public WriteOperationType getOperation() { + return operation; + } + + public long getAddedRowCount() { + return addedRowCount; + } + + public long getDeletedRowCount() { + return deletedRowCount; + } + @Override public String toString() { return "SnapshotContext{" + "snapshotId=" + snapshotId + + ", operation=" + operation + + ", addedRowCount=" + addedRowCount + + ", deletedRowCount=" + deletedRowCount + '}'; } }