Skip to content

Commit

Permalink
HIVE-28126: Use added record count in cost model when rebuilding mate…
Browse files Browse the repository at this point in the history
…rialized view stored by iceberg (Krisztian Kasa, reviewed by Denys Kuzmenko, okumin)
  • Loading branch information
kasakrisz committed Apr 9, 2024
1 parent 28ba6fb commit be857bf
Show file tree
Hide file tree
Showing 13 changed files with 845 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -1618,7 +1619,58 @@ public SnapshotContext getCurrentSnapshotContext(org.apache.hadoop.hive.ql.metad
if (current == null) {
return null;
}
return new SnapshotContext(current.snapshotId());
return toSnapshotContext(current);
}

private SnapshotContext toSnapshotContext(Snapshot snapshot) {
Map<String, String> summaryMap = snapshot.summary();
long addedRecords = getLongSummary(summaryMap, SnapshotSummary.ADDED_RECORDS_PROP);
long deletedRecords = getLongSummary(summaryMap, SnapshotSummary.DELETED_RECORDS_PROP);
return new SnapshotContext(
snapshot.snapshotId(), toWriteOperationType(snapshot.operation()), addedRecords, deletedRecords);
}

private SnapshotContext.WriteOperationType toWriteOperationType(String operation) {
try {
return SnapshotContext.WriteOperationType.valueOf(operation.toUpperCase());
} catch (NullPointerException | IllegalArgumentException ex) {
return SnapshotContext.WriteOperationType.UNKNOWN;
}
}

private long getLongSummary(Map<String, String> summaryMap, String key) {
String textValue = summaryMap.get(key);
if (StringUtils.isBlank(textValue)) {
return 0;
}
return Long.parseLong(textValue);
}

@Override
public Iterable<SnapshotContext> getSnapshotContexts(
org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {

TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
return getSnapshots(table.snapshots(), since);
}

@VisibleForTesting
Iterable<SnapshotContext> getSnapshots(Iterable<Snapshot> snapshots, SnapshotContext since) {
List<SnapshotContext> result = Lists.newArrayList();

boolean foundSince = Objects.isNull(since);
for (Snapshot snapshot : snapshots) {
if (!foundSince) {
if (snapshot.snapshotId() == since.getSnapshotId()) {
foundSince = true;
}
} else {
result.add(toSnapshotContext(snapshot));
}
}

return foundSince ? result : Collections.emptyList();
}

@Override
Expand Down Expand Up @@ -1692,6 +1744,20 @@ public void addResourcesForCreateTable(Map<String, String> tblProps, HiveConf hi
}
}

/**
* Check the operation type of all snapshots which are newer than the specified. The specified snapshot is excluded.
* @deprecated
* <br>Use {@link HiveStorageHandler#getSnapshotContexts(
* org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since)}
* and check {@link SnapshotContext.WriteOperationType#APPEND}.equals({@link SnapshotContext#getOperation()}).
*
* @param hmsTable table metadata stored in Hive Metastore
* @param since the snapshot preceding the oldest snapshot which should be checked.
* The value null means all should be checked.
* @return null if table is empty, true if all snapshots are {@link SnapshotContext.WriteOperationType#APPEND}s,
* false otherwise.
*/
@Deprecated
@Override
public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Expand All @@ -1708,7 +1774,7 @@ Boolean hasAppendsOnly(Iterable<Snapshot> snapshots, SnapshotContext since) {
foundSince = true;
}
} else {
if (!"append".equals(snapshot.operation())) {
if (!DataOperations.APPEND.equals(snapshot.operation())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.IterableUtils;
import org.apache.hadoop.hive.common.type.SnapshotContext;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -51,10 +55,18 @@ public class TestHiveIcebergStorageHandler {
@Before
public void before() {
when(anySnapshot.snapshotId()).thenReturn(42L);

Mockito.lenient().when(appendSnapshot.snapshotId()).thenReturn(20L);
Map<String, String> summary = Maps.newHashMap();
summary.put(SnapshotSummary.ADDED_RECORDS_PROP, "12");
Mockito.lenient().when(appendSnapshot.summary()).thenReturn(summary);
when(appendSnapshot.operation()).thenReturn("append");

Mockito.lenient().when(deleteSnapshot.snapshotId()).thenReturn(100L);
when(deleteSnapshot.operation()).thenReturn("delete");
summary = Maps.newHashMap();
summary.put(SnapshotSummary.DELETED_RECORDS_PROP, "3");
Mockito.lenient().when(deleteSnapshot.summary()).thenReturn(summary);
}

@Test
Expand Down Expand Up @@ -123,4 +135,51 @@ public void testHasAppendsOnlyReturnsNullWhenGivenSnapshotNotInTheList() {

assertThat(result, is(nullValue()));
}

@Test
public void testGetSnapshotContextsReturnsEmptyIterableWhenTableIsEmpty() {
SnapshotContext since = new SnapshotContext(42);

HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
Iterable<SnapshotContext> result = storageHandler.getSnapshots(Collections.emptyList(), since);

assertThat(result.iterator().hasNext(), is(false));
}

@Test
public void testGetSnapshotContextsReturnsEmptyIterableWhenTableIsEmptyAndGivenSnapShotIsNull() {
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
Iterable<SnapshotContext> result = storageHandler.getSnapshots(Collections.emptyList(), null);

assertThat(result.iterator().hasNext(), is(false));
}

@Test
public void testGetSnapshotContextsReturnsAllSnapshotsWhenGivenSnapshotIsNull() {
HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
Iterable<SnapshotContext> result = storageHandler.getSnapshots(asList(appendSnapshot, deleteSnapshot), null);

List<SnapshotContext> resultList = IterableUtils.toList(result);
assertThat(resultList.size(), is(2));
assertThat(resultList.get(0).getSnapshotId(), is(appendSnapshot.snapshotId()));
assertThat(resultList.get(0).getOperation(), is(SnapshotContext.WriteOperationType.APPEND));
assertThat(resultList.get(0).getAddedRowCount(), is(12L));
assertThat(resultList.get(0).getDeletedRowCount(), is(0L));

assertThat(resultList.get(1).getSnapshotId(), is(deleteSnapshot.snapshotId()));
assertThat(resultList.get(1).getOperation(), is(SnapshotContext.WriteOperationType.DELETE));
assertThat(resultList.get(1).getAddedRowCount(), is(0L));
assertThat(resultList.get(1).getDeletedRowCount(), is(3L));
}

@Test
public void testGetSnapshotContextsReturnsEmptyIterableWhenGivenSnapshotNotInTheList() {
SnapshotContext since = new SnapshotContext(1);
List<Snapshot> snapshotList = Arrays.asList(anySnapshot, appendSnapshot, deleteSnapshot);

HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler();
Iterable<SnapshotContext> result = storageHandler.getSnapshots(snapshotList, since);

assertThat(result.iterator().hasNext(), is(false));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
-- MV source tables are iceberg and MV has aggregate
-- SORT_QUERY_RESULTS
--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
--! qt:replace:/(\s+Version\sinterval\sfrom\:\s+)\d+(\s*)/$1#Masked#/

set hive.explain.user=false;
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Expand All @@ -13,7 +15,7 @@ create external table tbl_ice_v2(d int, e string, f int) stored by iceberg store
insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);

create materialized view mat1 as
create materialized view mat1 stored by iceberg stored as orc tblproperties ('format-version'='2') as
select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
from tbl_ice
join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
Expand All @@ -23,8 +25,23 @@ group by tbl_ice.b, tbl_ice.c;
insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54);
insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);

-- Incremental rebuild plan is more expensive than full rebuild
set hive.materializedview.rebuild.incremental.factor=10.0;

explain cbo
alter materialized view mat1 rebuild;
explain
alter materialized view mat1 rebuild;


-- Incremental rebuild plan is cheaper than full rebuild
set hive.materializedview.rebuild.incremental.factor=0.01;

explain cbo
alter materialized view mat1 rebuild;
explain
alter materialized view mat1 rebuild;

alter materialized view mat1 rebuild;

select * from mat1;
Loading

0 comments on commit be857bf

Please sign in to comment.