Skip to content

Commit

Permalink
HIVE-28510: Iceberg: FanoutPositionOnlyDeleteWriter support for unord…
Browse files Browse the repository at this point in the history
…ered position deletes (Denys Kuzmenko, reviewed by Krisztian Kasa, Sourabh Badhya)

Closes #5418
  • Loading branch information
deniskuzZ authored Sep 23, 2024
1 parent e31811b commit 2b6aa63
Show file tree
Hide file tree
Showing 44 changed files with 853 additions and 999 deletions.
1 change: 1 addition & 0 deletions iceberg/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
java.util.Collections.*,
java.util.stream.Collectors.*,
org.apache.commons.lang3.Validate.*,
org.apache.hadoop.hive.ql.metadata.VirtualColumn.*,
org.apache.iceberg.expressions.Expressions.*,
org.apache.iceberg.expressions.Expression.Operation.*,
org.apache.iceberg.IsolationLevel.*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private InputFormatConfig() {
public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";
public static final String WRITE_FANOUT_ENABLED = "write.fanout.enabled";

public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.parquet.hadoop.ParquetOutputFormat;

public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container<Record>>,
public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, Container<Record>>,
HiveOutputFormat<NullWritable, Container<Record>> {
private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;

@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
Expand All @@ -68,16 +66,14 @@ private static HiveIcebergWriter writer(JobConf jc) {
// It gets the config from the FileSinkOperator which has its own config for every target table
Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
String tableName = jc.get(Catalogs.NAME);
int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);

setWriterLevelConfiguration(jc, table);
return WriterBuilder.builderFor(table)
.queryId(jc.get(HiveConf.ConfVars.HIVE_QUERY_ID.varname))
.tableName(tableName)
.attemptID(taskAttemptID)
.poolSize(poolSize)
.operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc::get, tableName))
.isFanoutEnabled(!HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc::get, tableName))
.hasOrdering(HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc::get, tableName))
.isMergeTask(HiveCustomStorageHandlerUtils.isMergeTaskEnabled(jc::get, tableName))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private static Schema projectedSchema(Configuration configuration, String tableN
return projectedSchema;
}
}
if (IcebergTableUtil.isCopyOnWriteMode(operation, configuration::get)) {
boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, configuration::get);
if (isCOW) {
return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
}
switch (operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.FILE_PATH;
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_HASH;
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_PROJECTION;
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_SPEC_ID;
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.ROW_POSITION;

public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class);

Expand All @@ -222,13 +228,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H

public static final String TABLE_DEFAULT_LOCATION = "TABLE_DEFAULT_LOCATION";

private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(VirtualColumn.PARTITION_SPEC_ID,
VirtualColumn.PARTITION_HASH, VirtualColumn.FILE_PATH, VirtualColumn.ROW_POSITION,
VirtualColumn.PARTITION_PROJECTION);
private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = ACID_VIRTUAL_COLS.stream()
.map(v -> new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
.collect(Collectors.toList());
private static final List<VirtualColumn> ACID_VIRTUAL_COLS = ImmutableList.of(
PARTITION_SPEC_ID, PARTITION_HASH, FILE_PATH, ROW_POSITION, PARTITION_PROJECTION);

private static final List<FieldSchema> ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA = schema(ACID_VIRTUAL_COLS);

private static final List<FieldSchema> POSITION_DELETE_ORDERING =
orderBy(PARTITION_SPEC_ID, PARTITION_HASH, FILE_PATH, ROW_POSITION);

private static final List<FieldSchema> EMPTY_ORDERING = ImmutableList.of();

private Configuration conf;

Expand Down Expand Up @@ -1208,19 +1216,22 @@ public List<FieldSchema> acidSelectColumns(org.apache.hadoop.hive.ql.metadata.Ta

@Override
public FieldSchema getRowId() {
VirtualColumn rowId = VirtualColumn.ROW_POSITION;
VirtualColumn rowId = ROW_POSITION;
return new FieldSchema(rowId.getName(), rowId.getTypeInfo().getTypeName(), "");
}

@Override
public List<FieldSchema> acidSortColumns(org.apache.hadoop.hive.ql.metadata.Table table, Operation operation) {
switch (operation) {
case DELETE:
return ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA;
return IcebergTableUtil.isFanoutEnabled(table.getParameters()) ?
EMPTY_ORDERING : POSITION_DELETE_ORDERING;
case MERGE:
return POSITION_DELETE_ORDERING;
default:
// For update operations we use the same sort order defined by
// {@link #createDPContext(HiveConf, org.apache.hadoop.hive.ql.metadata.Table)}
return ImmutableList.of();
return EMPTY_ORDERING;
}
}

Expand Down Expand Up @@ -2151,14 +2162,7 @@ public boolean supportsMergeFiles() {

@Override
public List<FileStatus> getMergeTaskInputFiles(Properties properties) throws IOException {
String tableName = properties.getProperty(Catalogs.NAME);
String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();
List<JobContext> originalContextList = HiveIcebergOutputCommitter
.generateJobContext(configuration, tableName, snapshotRef);
List<JobContext> jobContextList = originalContextList.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
List<JobContext> jobContextList = IcebergMergeTaskProperties.getJobContexts(properties);
if (jobContextList.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -2183,4 +2187,14 @@ public boolean hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return hasUndergonePartitionEvolution(table);
}

private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
return exprs.stream().map(v ->
new FieldSchema(v.getName(), v.getTypeInfo().getTypeName(), ""))
.collect(Collectors.toList());
}

private static List<FieldSchema> orderBy(VirtualColumn... exprs) {
return schema(Arrays.asList(exprs));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@ public Path getTmpLocation() {

@Override
public Properties getSplitProperties() throws IOException {
String tableName = properties.getProperty(Catalogs.NAME);
String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();
List<JobContext> originalContextList = HiveIcebergOutputCommitter
.generateJobContext(configuration, tableName, snapshotRef);
List<JobContext> jobContextList = originalContextList.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
List<JobContext> jobContextList = getJobContexts(properties);
if (jobContextList.isEmpty()) {
return null;
}
Expand All @@ -62,4 +55,15 @@ public Properties getSplitProperties() throws IOException {
pathToContentFile.put(new Path(String.valueOf(contentFile.path())), contentFile));
return pathToContentFile;
}

static List<JobContext> getJobContexts(Properties properties) {
String tableName = properties.getProperty(Catalogs.NAME);
String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
Configuration configuration = SessionState.getSessionConf();

return HiveIcebergOutputCommitter.generateJobContext(configuration, tableName, snapshotRef)
.stream()
.map(TezUtil::enrichContextWithVertexId)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -351,6 +352,10 @@ public static boolean isCopyOnWriteMode(Context.Operation operation, BinaryOpera
return RowLevelOperationMode.COPY_ON_WRITE.modeName().equalsIgnoreCase(mode);
}

public static boolean isFanoutEnabled(Map<String, String> props) {
return PropertyUtil.propertyAsBoolean(props, InputFormatConfig.WRITE_FANOUT_ENABLED, true);
}

public static void performMetadataDelete(Table icebergTable, String branchName, SearchArgument sarg) {
Expression exp = HiveIcebergFilterFactory.generateFilterExpression(sarg);
DeleteFiles deleteFiles = icebergTable.newDelete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
positionDeleteRowSchema);
}

static Builder builderFor(Table table) {
return new Builder(table);
}

@Override
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
Expand Down Expand Up @@ -100,4 +104,49 @@ protected void configureEqualityDelete(ORC.DeleteWriteBuilder deleteWriteBuilder
protected void configurePositionDelete(ORC.DeleteWriteBuilder deleteWriteBuilder) {
deleteWriteBuilder.createWriterFunc(GenericOrcWriter::buildWriter);
}

static class Builder {
private final Table table;
private FileFormat dataFileFormat;
private Schema dataSchema;
private FileFormat deleteFileFormat;
private Schema positionDeleteRowSchema;

Builder(Table table) {
this.table = table;
}

Builder dataFileFormat(FileFormat newDataFileFormat) {
this.dataFileFormat = newDataFileFormat;
return this;
}

Builder dataSchema(Schema newDataSchema) {
this.dataSchema = newDataSchema;
return this;
}

Builder deleteFileFormat(FileFormat newDeleteFileFormat) {
this.deleteFileFormat = newDeleteFileFormat;
return this;
}

Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}

HiveFileWriterFactory build() {
return new HiveFileWriterFactory(
table,
dataFileFormat,
dataSchema,
null,
deleteFileFormat,
null,
null,
null,
positionDeleteRowSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

Expand All @@ -47,13 +43,12 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
private final GenericRecord rowDataTemplate;
private final List<DataFile> replacedDataFiles;

HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize) {
super(schema, specs, io,
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
this.currentSpecId = currentSpecId;
this.rowDataTemplate = GenericRecord.create(schema);
HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory writerFactory,
OutputFileFactory deleteFileFactory, Context context) {
super(table, newDataWriter(table, writerFactory, deleteFileFactory, context));

this.currentSpecId = table.spec().specId();
this.rowDataTemplate = GenericRecord.create(table.schema());
this.replacedDataFiles = Lists.newArrayList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;

class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
Expand All @@ -46,14 +42,13 @@ class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
private final boolean skipRowData;
private final boolean isMergeTask;

HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize, boolean skipRowData, boolean isMergeTask) {
super(schema, specs, io,
new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSize));
rowDataTemplate = GenericRecord.create(schema);
this.skipRowData = skipRowData;
this.isMergeTask = isMergeTask;
HiveIcebergDeleteWriter(Table table, HiveFileWriterFactory writerFactory,
OutputFileFactory deleteFileFactory, Context context) {
super(table, newDeleteWriter(table, writerFactory, deleteFileFactory, context));

this.rowDataTemplate = GenericRecord.create(table.schema());
this.skipRowData = context.skipRowData();
this.isMergeTask = context.isMergeTask();
}

@Override
Expand Down
Loading

0 comments on commit 2b6aa63

Please sign in to comment.