Skip to content

Commit

Permalink
HIVE-28282: Addendum: Iceberg: Merge-Insert with reserved keywords as…
Browse files Browse the repository at this point in the history
… column names (Denys Kuzmenko, reviewed by Krisztian Kasa)

Closes apache#5450
  • Loading branch information
deniskuzZ authored and dengzhhu653 committed Sep 24, 2024
1 parent ccddcdc commit bd2970f
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- SORT_QUERY_RESULTS
set hive.explain.user=false;

create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2');
create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read');
create table source(a int, b string, c int);

insert into target_ice values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55), (333, 'two', 56);
Expand All @@ -20,3 +20,9 @@ when matched then update set b = 'Merged', c = t.c + 10
when not matched then insert values (src.a, src.b, src.c);

select * from target_ice;

create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read');
-- reserved keywords
explain
merge into target_ice2 as t using source src ON t.a = src.a
when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge'));
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
PREHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
PREHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@target_ice
POSTHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2')
POSTHOOK: query: create external table target_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@target_ice
Expand Down Expand Up @@ -324,3 +324,158 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
333 two 56
4 four 53
5 five 54
PREHOOK: query: create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@target_ice2
POSTHOOK: query: create external table target_ice2(a int, `date` string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2', 'write.merge.mode'='merge-on-read')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@target_ice2
PREHOOK: query: explain
merge into target_ice2 as t using source src ON t.a = src.a
when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge'))
PREHOOK: type: QUERY
PREHOOK: Input: default@source
PREHOOK: Input: default@target_ice2
PREHOOK: Output: default@target_ice2
POSTHOOK: query: explain
merge into target_ice2 as t using source src ON t.a = src.a
when not matched then insert (a, `date`) values (src.a, concat(src.b, '-merge'))
POSTHOOK: type: QUERY
POSTHOOK: Input: default@source
POSTHOOK: Input: default@target_ice2
POSTHOOK: Output: default@target_ice2
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: target_ice2
filterExpr: a is not null (type: boolean)
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: a is not null (type: boolean)
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: a (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE
Execution mode: vectorized
Map 4
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: a (type: int), b (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: z
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 6 Data size: 552 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
Execution mode: vectorized
Reducer 2
Reduce Operator Tree:
Merge Join Operator
condition map:
Right Outer Join 0 to 1
keys:
0 _col0 (type: int)
1 _col0 (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 6 Data size: 607 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: _col0 is null (type: boolean)
Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col1 (type: int), concat(_col2, '-merge') (type: string), null (type: int)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice2
Select Operator
expressions: _col0 (type: int), _col1 (type: string), null (type: int)
outputColumnNames: a, date, c
Statistics: Num rows: 3 Data size: 303 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: min(a), max(a), count(1), count(a), compute_bit_vector_hll(a), max(length(date)), avg(COALESCE(length(date),0)), count(date), compute_bit_vector_hll(date), min(c), max(c), count(c), compute_bit_vector_hll(c)
minReductionHashAggr: 0.99
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
null sort order:
sort order:
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint), _col3 (type: bigint), _col4 (type: binary), _col5 (type: int), _col6 (type: struct<count:bigint,sum:double,input:int>), _col7 (type: bigint), _col8 (type: binary), _col9 (type: int), _col10 (type: int), _col11 (type: bigint), _col12 (type: binary)
Reducer 3
Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1), count(VALUE._col2), count(VALUE._col3), compute_bit_vector_hll(VALUE._col4), max(VALUE._col5), avg(VALUE._col6), count(VALUE._col7), compute_bit_vector_hll(VALUE._col8), min(VALUE._col9), max(VALUE._col10), count(VALUE._col11), compute_bit_vector_hll(VALUE._col12)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: 'LONG' (type: string), UDFToLong(_col0) (type: bigint), UDFToLong(_col1) (type: bigint), (_col2 - _col3) (type: bigint), COALESCE(ndv_compute_bit_vector(_col4),0) (type: bigint), _col4 (type: binary), 'STRING' (type: string), UDFToLong(COALESCE(_col5,0)) (type: bigint), COALESCE(_col6,0) (type: double), (_col2 - _col7) (type: bigint), COALESCE(ndv_compute_bit_vector(_col8),0) (type: bigint), _col8 (type: binary), 'LONG' (type: string), UDFToLong(_col9) (type: bigint), UDFToLong(_col10) (type: bigint), (_col2 - _col11) (type: bigint), COALESCE(ndv_compute_bit_vector(_col12),0) (type: bigint), _col12 (type: binary)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 560 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-2
Dependency Collection

Stage: Stage-0
Move Operator
tables:
replace: false
table:
input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice2

Stage: Stage-3
Stats Work
Basic Stats Work:
Column Stats Desc:
Columns: a, date, c
Column Types: int, string, int
Table: default.target_ice2

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -179,7 +180,7 @@ public void appendWhenNotMatchedInsertClause(MergeStatement.InsertClause insertC
sqlGenerator.append("INSERT INTO ").append(mergeStatement.getTargetName());
if (insertClause.getColumnList() != null) {
sqlGenerator.append(" (");
sqlGenerator.append(String.join(",", insertClause.getColumnList()));
sqlGenerator.appendCols(insertClause.getColumnList(), Function.identity());
sqlGenerator.append(')');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayDeque;
import java.util.List;
import java.util.Deque;
import java.util.function.Function;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void appendPartitionCols(Table table) {
return;
}
queryStr.append(" partition (");
appendCols(partCols);
appendCols(partCols, FieldSchema::getName);
queryStr.append(")");
}

Expand Down Expand Up @@ -155,22 +156,22 @@ public void appendPartColsOfTargetTableWithComma(String alias) {
return;
}
queryStr.append(',');
appendCols(targetTable.getPartCols(), alias, null);
appendCols(targetTable.getPartCols(), alias, null, FieldSchema::getName);
}

public void appendAllColsOfTargetTable(String prefix) {
appendCols(targetTable.getAllCols(), null, prefix);
appendCols(targetTable.getAllCols(), null, prefix, FieldSchema::getName);
}

public void appendAllColsOfTargetTable() {
appendCols(targetTable.getAllCols());
appendCols(targetTable.getAllCols(), FieldSchema::getName);
}

public void appendCols(List<FieldSchema> columns) {
appendCols(columns, null, null);
public <T> void appendCols(List<T> columns, Function<T, String> stringConverter) {
appendCols(columns, null, null, stringConverter);
}

public void appendCols(List<FieldSchema> columns, String alias, String prefix) {
public <T> void appendCols(List<T> columns, String alias, String prefix, Function<T, String> stringConverter) {
if (columns == null) {
return;
}
Expand All @@ -181,7 +182,7 @@ public void appendCols(List<FieldSchema> columns, String alias, String prefix) {
}

boolean first = true;
for (FieldSchema fschema : columns) {
for (T fschema : columns) {
if (first) {
first = false;
} else {
Expand All @@ -191,11 +192,11 @@ public void appendCols(List<FieldSchema> columns, String alias, String prefix) {
if (quotedAlias != null) {
queryStr.append(quotedAlias).append('.');
}
queryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
queryStr.append(HiveUtils.unparseIdentifier(stringConverter.apply(fschema), this.conf));

if (isNotBlank(prefix)) {
queryStr.append(" AS ");
String prefixedIdentifier = HiveUtils.unparseIdentifier(prefix + fschema.getName(), this.conf);
String prefixedIdentifier = HiveUtils.unparseIdentifier(prefix + stringConverter.apply(fschema), this.conf);
queryStr.append(prefixedIdentifier);
}
}
Expand Down

0 comments on commit bd2970f

Please sign in to comment.