Skip to content

Commit

Permalink
HIVE-28278: Iceberg: Stats: IllegalStateException Invalid file: file …
Browse files Browse the repository at this point in the history
…length 0 (Denys Kuzmenko, reviewed by Ayush Saxena, Butao Zhang)

Closes apache#5261
  • Loading branch information
deniskuzZ authored and dengzhhu653 committed Jun 12, 2024
1 parent 36cefb2 commit b09b397
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,7 +47,7 @@
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
Expand Down Expand Up @@ -139,6 +141,8 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
Expand All @@ -156,6 +160,7 @@
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -190,7 +195,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -290,7 +294,7 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
*/
static class HiveIcebergNoJobCommitter extends HiveIcebergOutputCommitter {
@Override
public void commitJob(JobContext originalContext) throws IOException {
public void commitJob(JobContext originalContext) {
// do nothing
}
}
Expand Down Expand Up @@ -384,7 +388,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
}
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
Expression filterExpr = (Expression) HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
if (filterExpr != null) {
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, filterExpr);
}
Expand Down Expand Up @@ -495,33 +499,58 @@ public boolean canSetColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsT
@Override
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
String snapshotId = String.format("%s-STATS-%d", tbl.name(), tbl.currentSnapshot().snapshotId());
return writeColStats(colStats.get(0), tbl, snapshotId);
return writeColStats(colStats.get(0), tbl);
}

private boolean writeColStats(ColumnStatistics tableColStats, Table tbl, String snapshotId) {
private boolean writeColStats(ColumnStatistics tableColStats, Table tbl) {
try {
boolean rewriteStats = removeColStatsIfExists(tbl);
if (!rewriteStats) {
if (!shouldRewriteColStats(tbl)) {
checkAndMergeColStats(tableColStats, tbl);
}
// Currently, we are only serializing table level stats.
byte[] serializeColStats = SerializationUtils.serialize(tableColStats);
try (PuffinWriter writer = Puffin.write(tbl.io().newOutputFile(getColStatsPath(tbl).toString()))
StatisticsFile statisticsFile;
String statsPath = tbl.location() + STATS + UUID.randomUUID();

try (PuffinWriter puffinWriter = Puffin.write(tbl.io().newOutputFile(statsPath))
.createdBy(Constants.HIVE_ENGINE).build()) {
writer.add(new Blob(tbl.name() + "-" + snapshotId, ImmutableList.of(1), tbl.currentSnapshot().snapshotId(),
tbl.currentSnapshot().sequenceNumber(), ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE,
ImmutableMap.of()));
writer.finish();
return true;
long snapshotId = tbl.currentSnapshot().snapshotId();
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
puffinWriter.add(
new Blob(
ColumnStatisticsObj.class.getSimpleName(),
ImmutableList.of(1),
snapshotId,
snapshotSequenceNumber,
ByteBuffer.wrap(serializeColStats),
PuffinCompressionCodec.NONE,
ImmutableMap.of()
));
puffinWriter.finish();

statisticsFile =
new GenericStatisticsFile(
snapshotId,
statsPath,
puffinWriter.fileSize(),
puffinWriter.footerSize(),
puffinWriter.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList())
);
} catch (IOException e) {
LOG.warn("Unable to write stats to puffin file {}", e.getMessage());
return false;
}
} catch (InvalidObjectException | IOException e) {
tbl.updateStatistics()
.setStatistics(statisticsFile.snapshotId(), statisticsFile)
.commit();
return true;

} catch (Exception e) {
LOG.warn("Unable to invalidate or merge stats: {}", e.getMessage());
return false;
}
return false;
}

@Override
Expand All @@ -531,36 +560,29 @@ public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table
}

private boolean canProvideColStats(Table table, long snapshotId) {
Path statsPath = getColStatsPath(table, snapshotId);
try {
FileSystem fs = statsPath.getFileSystem(conf);
return fs.exists(statsPath);
} catch (Exception e) {
LOG.warn("Exception when trying to find Iceberg column stats for table:{} , snapshot:{} , " +
"statsPath: {} , stack trace: {}", table.name(), table.currentSnapshot(), statsPath, e);
}
return false;
return IcebergTableUtil.getColStatsPath(table, snapshotId).isPresent();
}

@Override
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Path statsPath = getColStatsPath(table);
LOG.info("Using stats from puffin file at: {}", statsPath);
return readColStats(table, statsPath).getStatsObj();
return IcebergTableUtil.getColStatsPath(table).map(statsPath -> readColStats(table, statsPath))
.orElse(new ColumnStatistics()).getStatsObj();
}

private ColumnStatistics readColStats(Table table, Path statsPath) {
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
Map<BlobMetadata, ColumnStatistics> collect = Streams.stream(reader.readAll(blobMetadata)).collect(
Collectors.toMap(Pair::first, blobMetadataByteBufferPair -> SerializationUtils.deserialize(
ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
return collect.get(blobMetadata.get(0));
} catch (IOException | IndexOutOfBoundsException e) {
LOG.warn(" Unable to read iceberg col stats from puffin files: ", e);
return new ColumnStatistics();
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
if (it.hasNext()) {
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
LOG.info("Using col stats from : {}", statsPath);
return SerializationUtils.deserialize(byteBuffer);
}
} catch (Exception e) {
LOG.warn(" Unable to read col stats: ", e);
}
return new ColumnStatistics();
}

@Override
Expand All @@ -586,30 +608,18 @@ private String getStatsSource() {
.toUpperCase();
}

private Path getColStatsPath(Table table) {
return getColStatsPath(table, table.currentSnapshot().snapshotId());
}

private Path getColStatsPath(Table table, long snapshotId) {
return new Path(table.location() + STATS + snapshotId);
}

private boolean removeColStatsIfExists(Table tbl) throws IOException {
Path statsPath = getColStatsPath(tbl);
FileSystem fs = statsPath.getFileSystem(conf);
if (fs.exists(statsPath)) {
// Analyze table and stats updater thread
return fs.delete(statsPath, true);
}
private boolean shouldRewriteColStats(Table tbl) {
return SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
.filter(opType -> HiveOperation.ANALYZE_TABLE == opType)
.isPresent();
.filter(opType -> HiveOperation.ANALYZE_TABLE == opType).isPresent() ||
IcebergTableUtil.getColStatsPath(tbl).isPresent();
}

private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) throws InvalidObjectException {
Long previousSnapshotId = tbl.currentSnapshot().parentId();
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
ColumnStatistics statsObjOld = readColStats(tbl, getColStatsPath(tbl, previousSnapshotId));
ColumnStatistics statsObjOld = IcebergTableUtil.getColStatsPath(tbl, previousSnapshotId)
.map(statsPath -> readColStats(tbl, statsPath))
.orElse(null);
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
}
Expand Down Expand Up @@ -1900,7 +1910,7 @@ private Expression generateExpressionFromPartitionSpec(Table table, Map<String,
.findField(partitionField.sourceId()).type());
Object value = Conversions.fromPartitionString(resultType, entry.getValue());
TransformSpec.TransformType transformType = TransformSpec.fromString(partitionField.transform().toString());
Iterable iterable = () -> Collections.singletonList(value).iterator();
Iterable<?> iterable = () -> Collections.singletonList(value).iterator();
if (TransformSpec.TransformType.IDENTITY == transformType) {
Expression boundPredicate = Expressions.in(partitionField.name(), iterable);
finalExp = Expressions.and(finalExp, boundPredicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
Expand Down Expand Up @@ -133,6 +135,20 @@ static Table getTable(Configuration configuration, Properties properties) {
return getTable(configuration, properties, false);
}

static Optional<Path> getColStatsPath(Table table) {
return getColStatsPath(table, table.currentSnapshot().snapshotId());
}

static Optional<Path> getColStatsPath(Table table, long snapshotId) {
return table.statisticsFiles().stream()
.filter(stats -> stats.snapshotId() == snapshotId)
.filter(stats -> stats.blobMetadata().stream()
.anyMatch(metadata -> ColumnStatisticsObj.class.getSimpleName().equals(metadata.type()))
)
.map(stats -> new Path(stats.path()))
.findAny();
}

/**
* Create {@link PartitionSpec} based on the partition information stored in
* {@link TransformSpec}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.Assume;
import org.junit.Test;

import static org.apache.iceberg.mr.hive.HiveIcebergStorageHandler.STATS;

/**
* Tests verifying correct statistics generation behaviour on Iceberg tables triggered by: ANALYZE queries, inserts,
Expand Down Expand Up @@ -275,7 +274,9 @@ public void testIcebergColStatsPath() throws IOException {
shell.executeStatement(insert);

table.refresh();
Path tblColPath = new Path(table.location() + STATS + table.currentSnapshot().snapshotId());

Path tblColPath = IcebergTableUtil.getColStatsPath(table).orElse(null);
Assert.assertNotNull(tblColPath);
// Check that if colPath is created correctly
Assert.assertTrue(tblColPath.getFileSystem(shell.getHiveConf()).exists(tblColPath));
List<Object[]> result = shell.executeStatement("SELECT * FROM customers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_meta_3
Expand Down Expand Up @@ -696,6 +700,10 @@ hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_meta_3
Expand Down
Binary file not shown.

0 comments on commit b09b397

Please sign in to comment.