Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
Expand All @@ -46,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;

public class MetastoreUtil {
Expand Down Expand Up @@ -112,17 +112,18 @@ public static void alterTable(
}
}

public static List<FieldSchema> getPartitionKeys(org.apache.iceberg.Table table, int specId) {
Schema schema = table.specs().get(specId).schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return table.specs().get(specId).fields().stream()
.map(partField -> new FieldSchema(
schema.findColumnName(partField.sourceId()),
colNameToColType.get(schema.findColumnName(partField.sourceId())),
String.format("Transform: %s", partField.transform().toString()))
)
public static List<FieldSchema> getPartitionKeys(org.apache.iceberg.Table table) {
Schema schema = table.spec().schema();

return table.spec().fields().stream()
.map(partField -> {
Types.NestedField col = schema.findField(partField.sourceId());
return new FieldSchema(
col.name(),
HiveSchemaUtil.convertToTypeString(col.type()),
"Transform: %s".formatted(partField.transform())
);
})
.toList();
}

Expand All @@ -134,7 +135,7 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co
result.setDbName(tableName.getDb());
result.setTableName(tableName.getTable());
result.setTableType(TableType.EXTERNAL_TABLE.toString());
result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
result.setPartitionKeys(getPartitionKeys(table));
TableMetadata metadata = ((BaseTable) table).operations().current();
long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
List<String> partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly);
return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames);
return IcebergTableUtil.convertNameToHivePartition(hmsTable, partNames);
}

public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Expand Down Expand Up @@ -2280,7 +2280,7 @@ public List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Tab
return Collections.emptyList();
}
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return MetastoreUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId());
return MetastoreUtil.getPartitionKeys(icebergTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -41,7 +40,6 @@
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -55,7 +53,6 @@
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
Expand All @@ -65,8 +62,6 @@
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.util.Sets;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -774,26 +769,6 @@ private static PartitionStats recordToPartitionStats(StructLike record) {
return stats;
}

public static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath)
throws MetaException, HiveException {
if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) {
throw new HiveException("Table and partitionPath must not be null or empty.");
}

// Extract field names from the path: "field1=val1/field2=val2" → [field1, field2]
List<String> fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());

return icebergTable.specs().values().stream()
.filter(spec -> {
List<String> specFieldNames = spec.fields().stream()
.map(PartitionField::name)
.toList();
return specFieldNames.equals(fieldNames);
})
.findFirst() // Supposed to be only one matching spec
.orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath));
}

public static TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
TransformSpec spec = TransformSpec.fromString(transformName.toUpperCase(),
table.schema().findColumnName(sourceId));
Expand Down Expand Up @@ -849,26 +824,15 @@ public static boolean hasUndergonePartitionEvolution(Snapshot snapshot, FileIO i
.anyMatch(m -> m.partitionSpecId() != 0);
}

public static <T extends ContentFile<?>> Set<String> getPartitionNames(Table icebergTable, Iterable<T> files,
Boolean latestSpecOnly) {
Set<String> partitions = Sets.newHashSet();
int tableSpecId = icebergTable.spec().specId();
for (T file : files) {
if (latestSpecOnly == null || latestSpecOnly.equals(file.specId() == tableSpecId)) {
String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
partitions.add(partName);
}
}
return partitions;
}

public static List<Partition> convertNameToMetastorePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
public static List<Partition> convertNameToHivePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Collection<String> partNames) {
List<Partition> partitions = Lists.newArrayList();
for (String partName : partNames) {
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
partitions.add(new DummyPartition(hmsTable, partName, partSpecMap));
try {
partitions.add(new DummyPartition(hmsTable, partName, Warehouse.makeSpecFromName(partName)));
} catch (MetaException e) {
LOG.error(e.getMessage(), e);
}
}
return partitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@
package org.apache.iceberg.mr.hive.compaction;

import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.util.Sets;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -100,4 +107,37 @@ public static List<DeleteFile> getDeleteFiles(Table table, Long snapshotId, Stri
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}

static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath)
throws MetaException, HiveException {
if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) {
throw new HiveException("Table and partitionPath must not be null or empty.");
}

// Extract field names from the path: "field1=val1/field2=val2" → [field1, field2]
List<String> fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());

return icebergTable.specs().values().stream()
.filter(spec -> {
List<String> specFieldNames = spec.fields().stream()
.map(PartitionField::name)
.toList();
return specFieldNames.equals(fieldNames);
})
.findFirst() // Supposed to be only one matching spec
.orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath));
}

static <T extends ContentFile<?>> Set<String> getPartitionNames(Table icebergTable, Iterable<T> files,
boolean latestSpecOnly) {
Set<String> partitions = Sets.newHashSet();
int tableSpecId = icebergTable.spec().specId();
for (T file : files) {
if (latestSpecOnly == (file.specId() == tableSpecId)) {
String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
partitions.add(partName);
}
}
return partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
PartitionSpec spec;
String partitionPredicate;
try {
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
spec = IcebergCompactionUtil.getPartitionSpec(icebergTable, ci.partName);
partitionPredicate = buildPartitionPredicate(ci, spec);
} catch (MetaException e) {
throw new HiveException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private List<Partition> findModifiedPartitions(
}
);

return IcebergTableUtil.convertNameToMetastorePartition(
return IcebergTableUtil.convertNameToHivePartition(
hiveTable, modifiedPartitions);
}

Expand All @@ -249,7 +249,7 @@ private List<Callable<Set<String>>> createPartitionNameTasks(

return relevantSnapshots.stream()
.map(snapshot -> (Callable<Set<String>>) () ->
IcebergTableUtil.getPartitionNames(
IcebergCompactionUtil.getPartitionNames(
icebergTable,
getAffectedFiles(snapshot, icebergTable.io()),
latestSpecOnly))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -76,10 +75,10 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
+ ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED.varname + " to work around this error";

private static final Map<String, VirtualColumn> ALLOWED_VIRTUAL_COLUMNS = Collections.unmodifiableMap(
new HashMap<String, VirtualColumn>() {{
put(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID);
put(VirtualColumn.ROWISDELETED.getName(), VirtualColumn.ROWISDELETED);
}});
new HashMap<>() {{
put(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID);
put(VirtualColumn.ROWISDELETED.getName(), VirtualColumn.ROWISDELETED);
}});

private final InputFormat<NullWritable, VectorizedRowBatch> sourceInputFormat;
private final AvoidSplitCombination sourceASC;
Expand Down Expand Up @@ -203,16 +202,16 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

@Override
public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
return sourceASC != null && sourceASC.shouldSkipCombine(path, conf);
}

static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
// This is based on Vectorizer code, minus the validation.

// Add all non-virtual columns from the TableScan operator.
RowSchema rowSchema = findTsOp(mapWork).getSchema();
final List<String> colNames = new ArrayList<String>(rowSchema.getSignature().size());
final List<TypeInfo> colTypes = new ArrayList<TypeInfo>(rowSchema.getSignature().size());
final List<String> colNames = new ArrayList<>(rowSchema.getSignature().size());
final List<TypeInfo> colTypes = new ArrayList<>(rowSchema.getSignature().size());
ArrayList<VirtualColumn> virtualColumnList = new ArrayList<>(2);
for (ColumnInfo c : rowSchema.getSignature()) {
String columnName = c.getInternalName();
Expand All @@ -232,15 +231,15 @@ static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveExcept
if (paths.hasNext()) {
PartitionDesc partDesc = mapWork.getPathToPartitionInfo().get(paths.next());
if (partDesc != null) {
LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
Map<String, String> partSpec = partDesc.getPartSpec();
if (partSpec != null && !partSpec.isEmpty()) {
partitionColumnCount = partSpec.size();
}
}
}
final VirtualColumn[] virtualColumns = virtualColumnList.toArray(new VirtualColumn[0]);
return new VectorizedRowBatchCtx(colNames.toArray(new String[colNames.size()]),
colTypes.toArray(new TypeInfo[colTypes.size()]), null, null, partitionColumnCount,
return new VectorizedRowBatchCtx(colNames.toArray(new String[0]),
colTypes.toArray(new TypeInfo[0]), null, null, partitionColumnCount,
virtualColumns.length, virtualColumns, new String[0], null);
}

Expand Down
Loading
Loading