Skip to content

Commit

Permalink
HIVE-27031: Addendum: Iceberg: Implement Copy-On-Write for Delete que…
Browse files Browse the repository at this point in the history
…ries (Denys Kuzmenko, reviewed by Krisztian Kasa, Butao Zhang)

Closes #4700
  • Loading branch information
deniskuzZ authored Oct 10, 2023
1 parent 076f0ac commit ec51d3b
Show file tree
Hide file tree
Showing 18 changed files with 4,014 additions and 111 deletions.
16 changes: 12 additions & 4 deletions data/conf/iceberg/llap/tez-site.xml
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
<configuration>
<property>
<name>tez.am.resource.memory.mb</name>
<value>128</value>
</property>
<property>
<name>tez.am.dag.scheduler.class</name>
<value>org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled</value>
Expand All @@ -11,4 +7,16 @@
<name>tez.am.resource.memory.mb</name>
<value>256</value>
</property>
<property>
<name>tez.runtime.io.sort.mb</name>
<value>24</value>
</property>
<property>
<name>hive.tez.container.size</name>
<value>512</value>
</property>
<property>
<name>tez.counters.max</name>
<value>1024</value>
</property>
</configuration>
8 changes: 8 additions & 0 deletions data/conf/iceberg/tez/tez-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
<name>tez.am.resource.memory.mb</name>
<value>256</value>
</property>
<property>
<name>tez.runtime.io.sort.mb</name>
<value>24</value>
</property>
<property>
<name>hive.tez.container.size</name>
<value>512</value>
</property>
<property>
<name>tez.counters.max</name>
<value>1024</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ public class FilesForCommit implements Serializable {

private final Collection<DataFile> dataFiles;
private final Collection<DeleteFile> deleteFiles;
private Collection<DataFile> referencedDataFiles;

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> referencedDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.referencedDataFiles = referencedDataFiles;
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
Expand All @@ -47,6 +54,10 @@ public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList());
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> referencedDataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList(), referencedDataFiles);
}

public static FilesForCommit empty() {
return new FilesForCommit(Collections.emptyList(), Collections.emptyList());
}
Expand All @@ -59,19 +70,25 @@ public Collection<DeleteFile> deleteFiles() {
return deleteFiles;
}

public Collection<DataFile> referencedDataFiles() {
return referencedDataFiles;
}

public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList());
}

public boolean isEmpty() {
return dataFiles.isEmpty() && deleteFiles.isEmpty();
return dataFiles.isEmpty() && deleteFiles.isEmpty() && referencedDataFiles.isEmpty();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("dataFiles", dataFiles.toString())
.add("deleteFiles", deleteFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -140,12 +141,15 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
if (writers.get(output) != null) {
Collection<DataFile> dataFiles = Lists.newArrayList();
Collection<DeleteFile> deleteFiles = Lists.newArrayList();
Collection<DataFile> referencedDataFiles = Lists.newArrayList();
for (HiveIcebergWriter writer : writers.get(output)) {
FilesForCommit files = writer.files();
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
}
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles), fileForCommitLocation, table.io());
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles),
fileForCommitLocation, table.io());
} else {
LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
createFileForCommit(FilesForCommit.empty(), fileForCommitLocation, table.io());
Expand Down Expand Up @@ -405,6 +409,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}
List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> referencedDataFiles = Lists.newArrayList();

Table table = null;
String branchName = null;
Expand All @@ -431,9 +436,10 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
numTasks, executor, outputTable.table.location(), jobContext, io, true);
dataFiles.addAll(writeResults.dataFiles());
deleteFiles.addAll(writeResults.deleteFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());
}

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles);
FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
long startTime = System.currentTimeMillis();

if (Operation.IOW != operation) {
Expand Down Expand Up @@ -470,6 +476,21 @@ private Long getSnapshotId(Table table, String branchName) {
private void commitWrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results, Operation operation) {

if (!results.referencedDataFiles().isEmpty()) {
OverwriteFiles write = table.newOverwrite();
results.referencedDataFiles().forEach(write::deleteFile);
results.dataFiles().forEach(write::addFile);

if (StringUtils.isNotEmpty(branchName)) {
write.toBranch(HiveUtils.getTableSnapshotRef(branchName));
}
if (snapshotId != null) {
write.validateFromSnapshot(snapshotId);
}
write.commit();
return;
}

if (results.deleteFiles().isEmpty() && Operation.MERGE != operation) {
AppendFiles write = table.newAppend();
results.dataFiles().forEach(write::appendFile);
Expand Down Expand Up @@ -620,6 +641,7 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
// starting from 0.
Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> referencedDataFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
Expand All @@ -629,9 +651,11 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
FilesForCommit files = readFileForCommit(taskFileName, io);
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
referencedDataFiles.addAll(files.referencedDataFiles());

});

return new FilesForCommit(dataFiles, deleteFiles);
return new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,18 @@ public String toString() {
public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc exprNodeDesc) {
DecomposedPredicate predicate = new DecomposedPredicate();
predicate.residualPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc;
ExprNodeDesc pushedPredicate = exprNodeDesc.clone();

List<ExprNodeDesc> subExprNodes = pushedPredicate.getChildren();
if (subExprNodes.removeIf(nodeDesc -> nodeDesc.getCols() != null &&
nodeDesc.getCols().contains(VirtualColumn.FILE_PATH.getName()))) {
if (subExprNodes.size() == 1) {
pushedPredicate = subExprNodes.get(0);
} else if (subExprNodes.isEmpty()) {
pushedPredicate = null;
}
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
return predicate;
}

Expand Down Expand Up @@ -1074,6 +1085,12 @@ public List<FieldSchema> acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Ta
}
}

@Override
public FieldSchema getRowId() {
VirtualColumn rowId = VirtualColumn.ROW_POSITION;
return new FieldSchema(rowId.getName(), rowId.getTypeInfo().getTypeName(), "");
}

@Override
public List<FieldSchema> acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.writer;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {

private final int currentSpecId;

private final GenericRecord rowDataTemplate;
private final List<DataFile> referencedDataFiles;

HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize) {
super(schema, specs, io,
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
this.currentSpecId = currentSpecId;
this.rowDataTemplate = GenericRecord.create(schema);
this.referencedDataFiles = Lists.newArrayList();
}

@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
PositionDelete<Record> positionDelete = IcebergAcidUtil.getPositionDelete(record, rowDataTemplate);
int specId = IcebergAcidUtil.parseSpecId(record);
Record rowData = positionDelete.row();

if (positionDelete.pos() < 0) {
DataFile dataFile =
DataFiles.builder(specs.get(specId))
.withPath(positionDelete.path().toString())
.withPartition(partition(rowData, specId))
.withFileSizeInBytes(0)
.withRecordCount(0)
.build();
referencedDataFiles.add(dataFile);
} else {
writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId));
}
}

@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
return FilesForCommit.onlyData(dataFiles, referencedDataFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -120,11 +121,19 @@ public HiveIcebergWriter build() {
new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null, deleteFileFormat, null, null, null,
skipRowData ? null : dataSchema);

boolean copyOnWriteMode = RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(
properties.get(TableProperties.DELETE_MODE)) && operation == Operation.DELETE;

HiveIcebergWriter writer;
switch (operation) {
case DELETE:
writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
if (copyOnWriteMode) {
writer = new HiveIcebergCopyOnWriteRecordWriter(dataSchema, specs, currentSpecId, writerFactory,
outputFileFactory, io, targetFileSize);
} else {
writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
io, targetFileSize, skipRowData);
}
break;
case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
set hive.explain.user=false;

drop table if exists tbl_ice;
create external table tbl_ice(a int, b string, c int) partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg tblproperties ('format-version'='2', 'write.delete.mode'='copy-on-write');

-- delete using simple predicates
insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54), (111, 'one', 55), (333, 'two', 56);
explain delete from tbl_ice where b in ('one', 'four') or a = 22;

delete from tbl_ice where b in ('one', 'four') or a = 22;
select * from tbl_ice order by a;
-- (2, 'two', 51), (3, 'three', 52), (5, 'five', 54), (333, 'two', 56)

-- delete using subqueries referencing the same table
insert into tbl_ice values (444, 'hola', 800), (555, 'schola', 801);
explain delete from tbl_ice where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800);

delete from tbl_ice where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800);
select * from tbl_ice order by a;
-- (333, 'two', 56), (444, 'hola', 800)

-- delete using a join subquery between the same table & another table
drop table if exists tbl_ice_other;
create external table tbl_ice_other(a int, b string) stored by iceberg;
insert into tbl_ice_other values (10, 'ten'), (333, 'hundred');
explain delete from tbl_ice where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a);

delete from tbl_ice where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a);
select * from tbl_ice order by a;
-- (444, 'hola', 800)

-- delete using a join subquery between the same table & a non-iceberg table
drop table if exists tbl_standard_other;
create external table tbl_standard_other(a int, b string) stored as orc;
insert into tbl_standard_other values (10, 'ten'), (444, 'tutu');
explain delete from tbl_ice where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a);

delete from tbl_ice where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a);
select count(*) from tbl_ice;
-- 0
Loading

0 comments on commit ec51d3b

Please sign in to comment.