From bd2970fd24d82aa555267ba90c21d350315734de Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 18 Sep 2024 14:26:41 +0200 Subject: [PATCH] HIVE-28282: Addendum: Iceberg: Merge-Insert with reserved keywords as column names (Denys Kuzmenko, reviewed by Krisztian Kasa) Closes #5450 --- .../test/queries/positive/merge_iceberg_orc.q | 8 +- .../results/positive/merge_iceberg_orc.q.out | 159 +++++++++++++++++- .../hive/ql/parse/rewrite/MergeRewriter.java | 3 +- .../rewrite/sql/MultiInsertSqlGenerator.java | 23 +-- 4 files changed, 178 insertions(+), 15 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_orc.q b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_orc.q index 203fdc2598cd..6b14261a1a14 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_orc.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_orc.q @@ -1,7 +1,7 @@ -- SORT_QUERY_RESULTS set hive.explain.user=false; -create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2'); +create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read'); create table source(a int, b string, c int); insert into target_ice values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55), (333, 'two', 56); @@ -20,3 +20,9 @@ when matched then update set b = 'Merged', c = t.c + 10 when not matched then insert values (src.a, src.b, src.c); select * from target_ice; + +create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read'); +-- reserved keywords +explain +merge into target_ice2 as t using source src ON t.a = src.a +when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge')); \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out index dceb9986e24a..5e5348e8c2ae 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out @@ -1,8 +1,8 @@ -PREHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2') +PREHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read') PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@target_ice -POSTHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2') +POSTHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read') POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@target_ice @@ -324,3 +324,158 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 333 two 56 4 four 53 5 five 54 +PREHOOK: query: create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@target_ice2 +POSTHOOK: query: create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@target_ice2 +PREHOOK: query: explain +merge into target_ice2 as t using source src ON t.a = src.a +when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge')) +PREHOOK: type: QUERY +PREHOOK: Input: default@source +PREHOOK: Input: default@target_ice2 +PREHOOK: Output: default@target_ice2 +POSTHOOK: query: explain +merge into target_ice2 as t using source src ON t.a = src.a +when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge')) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@source +POSTHOOK: Input: default@target_ice2 +POSTHOOK: Output: default@target_ice2 +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: target_ice2 + filterExpr: a is not null (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: a is not null (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: a (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: a (type: int), b (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Merge Join Operator + condition map: + Right Outer Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 6 Data size: 607 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col0 is null (type: boolean) + Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: int), concat(_col2, '-merge') (type: string), null (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.target_ice2 + Select Operator + expressions: _col0 (type: int), _col1 (type: string), null (type: int) + outputColumnNames: a, date, c + Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(date)), avg(COALESCE(length(date),0)), count(date), compute_bit_vector_hll(date), min(c), max(c), count(c), compute_bit_vector_hll(c) + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary) + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17 + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat + output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat + serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe + name: default.target_ice2 + + Stage: Stage-3 + Stats Work + Basic Stats Work: + Column Stats Desc: + Columns: a, date, c + Column Types: int, string, int + Table: default.target_ice2 + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java index 47d1fb8fc6e4..15b5cb6150a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/MergeRewriter.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.function.UnaryOperator; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -179,7 +180,7 @@ public void appendWhenNotMatchedInsertClause(MergeStatement.InsertClause insertC sqlGenerator.append("INSERT INTO ").append(mergeStatement.getTargetName()); if (insertClause.getColumnList() != null) { sqlGenerator.append(" ("); - sqlGenerator.append(String.join(",", insertClause.getColumnList())); + sqlGenerator.appendCols(insertClause.getColumnList(), Function.identity()); sqlGenerator.append(')'); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/sql/MultiInsertSqlGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/sql/MultiInsertSqlGenerator.java index 554b6d394ee5..ddd57a6dd074 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/sql/MultiInsertSqlGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/rewrite/sql/MultiInsertSqlGenerator.java @@ -27,6 +27,7 @@ import java.util.ArrayDeque; import java.util.List; import java.util.Deque; +import java.util.function.Function; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -119,7 +120,7 @@ public void appendPartitionCols(Table table) { return; } queryStr.append(" partition ("); - appendCols(partCols); + appendCols(partCols, FieldSchema::getName); queryStr.append(")"); } @@ -155,22 +156,22 @@ public void appendPartColsOfTargetTableWithComma(String alias) { return; } queryStr.append(','); - appendCols(targetTable.getPartCols(), alias, null); + appendCols(targetTable.getPartCols(), alias, null, FieldSchema::getName); } public void appendAllColsOfTargetTable(String prefix) { - appendCols(targetTable.getAllCols(), null, prefix); + appendCols(targetTable.getAllCols(), null, prefix, FieldSchema::getName); } public void appendAllColsOfTargetTable() { - appendCols(targetTable.getAllCols()); + appendCols(targetTable.getAllCols(), FieldSchema::getName); } - - public void appendCols(List columns) { - appendCols(columns, null, null); + + public void appendCols(List columns, Function stringConverter) { + appendCols(columns, null, null, stringConverter); } - public void appendCols(List columns, String alias, String prefix) { + public void appendCols(List columns, String alias, String prefix, Function stringConverter) { if (columns == null) { return; } @@ -181,7 +182,7 @@ public void appendCols(List columns, String alias, String prefix) { } boolean first = true; - for (FieldSchema fschema : columns) { + for (T fschema : columns) { if (first) { first = false; } else { @@ -191,11 +192,11 @@ public void appendCols(List columns, String alias, String prefix) { if (quotedAlias != null) { queryStr.append(quotedAlias).append('.'); } - queryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf)); + queryStr.append(HiveUtils.unparseIdentifier(stringConverter.apply(fschema), this.conf)); if (isNotBlank(prefix)) { queryStr.append(" AS "); - String prefixedIdentifier = HiveUtils.unparseIdentifier(prefix + fschema.getName(), this.conf); + String prefixedIdentifier = HiveUtils.unparseIdentifier(prefix + stringConverter.apply(fschema), this.conf); queryStr.append(prefixedIdentifier); } }