Skip to content

Commit

Permalink
dlo with delete file raw stats
Browse files Browse the repository at this point in the history
  • Loading branch information
cbb330 committed Feb 15, 2025
1 parent 81ca32e commit 5867db2
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,29 @@ private void appendToDloStrategiesTable(
if (isPartitionScope) {
rows.add(
String.format(
"('%s', '%s', '%s', current_timestamp(), %f, %f, %f)",
"('%s', '%s', '%s', current_timestamp(), %f, %f, %f, %f, %f, %f, %f)",
fqtn,
strategy.getPartitionId(),
strategy.getPartitionColumns(),
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy()));
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes()));
} else {
rows.add(
String.format(
"('%s', current_timestamp(), %f, %f, %f)",
fqtn, strategy.getCost(), strategy.getGain(), strategy.getEntropy()));
"('%s', current_timestamp(), %f, %f, %f, %f, %f, %f, %f)",
fqtn,
strategy.getCost(),
strategy.getGain(),
strategy.getEntropy(),
strategy.getPosDeleteFileCount(),
strategy.getEqDeleteFileCount(),
strategy.getPosDeleteFileBytes(),
strategy.getEqDeleteFileBytes()));
}
}
String strategiesInsertStmt =
Expand All @@ -114,6 +125,10 @@ private void createTableIfNotExists(
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE "
+ "pos_delete_file_count DOUBLE "
+ "eq_delete_file_count DOUBLE "
+ "pos_delete_file_bytes DOUBLE "
+ "eq_delete_file_bytes DOUBLE "
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
Expand All @@ -126,6 +141,10 @@ private void createTableIfNotExists(
+ "estimated_compute_cost DOUBLE, "
+ "estimated_file_count_reduction DOUBLE, "
+ "file_size_entropy DOUBLE "
+ "pos_delete_file_count DOUBLE "
+ "eq_delete_file_count DOUBLE "
+ "pos_delete_file_bytes DOUBLE "
+ "eq_delete_file_bytes DOUBLE "
+ ") "
+ "PARTITIONED BY (days(timestamp))",
outputFqtn));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.openhouse.jobs.spark;

import com.linkedin.openhouse.jobs.util.OtelConfig;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

@Slf4j
public class DataLayoutStrategyGeneratorSparkAppTest extends OpenHouseSparkITest {
private final Meter meter = OtelConfig.getMeter(this.getClass().getName());

@Test
public void testDataLayoutStrategyGeneratorSparkApp() throws Exception {
final String tableName = "db.test_dlo_strategy_sql";
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
// TODO: use the below style of testing to create delete files, execute DLO, and validate the
// DLO table was created and the contents of it
// val rowsBatchSize = 10
// val startDayLookback = 0
// val numExpectedDeleteFiles = 1
// val numExpectedDataFiles = 1
// createTable(fqtn)
// appendRows(fqtn, rowsBatchSize, startDayLookback)
// verifyInitialFiles(fqtn, numExpectedDataFiles)
// verifyInitialRows(fqtn, rowsBatchSize)
// deleteFirstRecordWithoutSpark(fqtn)
// verifyFilesAfterDelete(fqtn, numExpectedDeleteFiles)
// verifyRowsAfterDelete(fqtn, rowsBatchSize - 1)
// appendRows(fqtn, rowsBatchSize, startDayLookback)
// val jobId = submitCompactionJob(fqtn)
// assert(waitJobSucceeded(jobId), "Compaction job didn't succeed")
// touchTable(fqtn)
// verifyDeleteFilesRemovedByCompaction(fqtn, numExpectedDeleteFiles)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
@NoArgsConstructor
@AllArgsConstructor
public final class FileStat {
private int content;
private String path;
private long size;
private long sizeInBytes;
private List<String> partitionValues;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ public Dataset<FileStat> get() {
return spark
.sql(
String.format(
"SELECT file_path, file_size_in_bytes, partition FROM %s.data_files", tableName))
"SELECT content, file_path, file_size_in_bytes, partition FROM %s.data_files",
tableName))
.map(new FileStatMapper(), Encoders.bean(FileStat.class));
} catch (IllegalArgumentException e) {
return spark
.sql(
String.format(
"SELECT file_path, file_size_in_bytes, null FROM %s.data_files", tableName))
"SELECT content, file_path, file_size_in_bytes, null FROM %s.data_files",
tableName))
.map(new FileStatMapper(), Encoders.bean(FileStat.class));
}
}
Expand All @@ -41,15 +43,16 @@ static class FileStatMapper implements MapFunction<Row, FileStat> {
@Override
public FileStat call(Row row) {
List<String> partitionValues = new ArrayList<>();
Row partition = row.getStruct(2);
Row partition = row.getStruct(3);
if (partition != null) {
for (int i = 0; i < partition.size(); i++) {
partitionValues.add(Objects.toString(partition.get(i)));
}
}
return FileStat.builder()
.path(row.getString(0))
.size(row.getLong(1))
.content(row.getInt(0))
.path(row.getString(1))
.sizeInBytes(row.getLong(2))
.partitionValues(partitionValues)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Optional;
import lombok.Builder;
import org.apache.iceberg.FileContent;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
Expand Down Expand Up @@ -57,11 +58,10 @@ public List<DataLayoutStrategy> generate() {
@Override
public List<DataLayoutStrategy> generateTableLevelStrategies() {
// Retrieve file sizes of all data files.
Dataset<Long> fileSizes =
tableFileStats.get().map((MapFunction<FileStat, Long>) FileStat::getSize, Encoders.LONG());
Dataset<FileStat> fileStats = tableFileStats.get();
long partitionCount = tablePartitionStats.get().count();
Optional<DataLayoutStrategy> strategy =
buildDataLayoutStrategy(fileSizes, partitionCount, null, null);
buildDataLayoutStrategy(fileStats, partitionCount, null, null);
return strategy.map(Collections::singletonList).orElse(Collections.emptyList());
}

Expand All @@ -73,22 +73,21 @@ public List<DataLayoutStrategy> generateTableLevelStrategies() {
public List<DataLayoutStrategy> generatePartitionLevelStrategies() {
List<DataLayoutStrategy> strategies = new ArrayList<>();
List<PartitionStat> partitionStatsList = tablePartitionStats.get().collectAsList();
Dataset<FileStat> fileSizesDataset = tableFileStats.get();
String partitionColumns = String.join(", ", tablePartitionStats.getPartitionColumns());
// For each partition, generate a compaction strategy
partitionStatsList.forEach(
partitionStat -> {
String partitionValues = String.join(", ", partitionStat.getValues());
Dataset<Long> fileSizes =
fileSizesDataset
Dataset<FileStat> fileStats =
tableFileStats
.get()
.filter(
(FilterFunction<FileStat>)
fileStat ->
String.join(", ", fileStat.getPartitionValues())
.equals(partitionValues))
.map((MapFunction<FileStat, Long>) FileStat::getSize, Encoders.LONG());
.equals(partitionValues));
Optional<DataLayoutStrategy> strategy =
buildDataLayoutStrategy(fileSizes, 1L, partitionValues, partitionColumns);
buildDataLayoutStrategy(fileStats, 1L, partitionValues, partitionColumns);
strategy.ifPresent(strategies::add);
});
return strategies;
Expand All @@ -106,31 +105,37 @@ public List<DataLayoutStrategy> generatePartitionLevelStrategies() {
* </ul>
*/
private Optional<DataLayoutStrategy> buildDataLayoutStrategy(
Dataset<Long> fileSizes,
Dataset<FileStat> fileStats,
long partitionCount,
String partitionValues,
String partitionColumns) {
Dataset<Long> filteredSizes =
fileSizes.filter(
(FilterFunction<Long>)
size ->
size < TARGET_BYTES_SIZE * DataCompactionConfig.MIN_BYTE_SIZE_RATIO_DEFAULT);

Dataset<FileStat> dataFiles =
fileStats.filter(
(FilterFunction<FileStat>) file -> file.getContent() == FileContent.DATA.id());

Dataset<FileStat> filteredDataFiles =
dataFiles.filter(
(FilterFunction<FileStat>)
file ->
file.getSizeInBytes()
< TARGET_BYTES_SIZE * DataCompactionConfig.MIN_BYTE_SIZE_RATIO_DEFAULT);

// Check whether we have anything to map/reduce on for cost computation, this is only the case
// if we have small files that need to be compacted.
if (filteredSizes.count() == 0) {
if (filteredDataFiles.count() == 0) {
return Optional.empty();
}
// Traits computation (cost, gain, and entropy).
Tuple2<Long, Integer> fileStats =
filteredSizes
.map(
(MapFunction<Long, Tuple2<Long, Integer>>) size -> new Tuple2<>(size, 1),
Encoders.tuple(Encoders.LONG(), Encoders.INT()))
.reduce(
(ReduceFunction<Tuple2<Long, Integer>>)
(a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2));
long rewriteFileBytes = fileStats._1;
int rewriteFileCount = fileStats._2;

Tuple2<Long, Integer> filteredDataFileStats =
computeFileStats(filteredDataFiles, FileContent.DATA);
Tuple2<Long, Integer> posDeleteStats =
computeFileStats(fileStats, FileContent.POSITION_DELETES);
Tuple2<Long, Integer> eqDeleteStats = computeFileStats(fileStats, FileContent.EQUALITY_DELETES);

long rewriteFileBytes = filteredDataFileStats._1;
int rewriteFileCount = filteredDataFileStats._2;

long reducedFileCount = estimateReducedFileCount(rewriteFileBytes, rewriteFileCount);
double computeGbHr = estimateComputeGbHr(rewriteFileBytes);
// computeGbHr >= COMPUTE_STARTUP_COST_GB_HR
Expand All @@ -142,9 +147,16 @@ private Optional<DataLayoutStrategy> buildDataLayoutStrategy(
.cost(computeGbHr)
.gain(reducedFileCount)
.score(reducedFileCountPerComputeGbHr)
.entropy(computeEntropy(fileSizes))
.entropy(
computeEntropy(
dataFiles.map(
(MapFunction<FileStat, Long>) FileStat::getSizeInBytes, Encoders.LONG())))
.partitionId(partitionValues)
.partitionColumns(partitionColumns)
.posDeleteFileBytes(posDeleteStats._1)
.eqDeleteFileBytes(eqDeleteStats._1)
.posDeleteFileCount(posDeleteStats._2)
.eqDeleteFileCount(eqDeleteStats._2)
.build());
}

Expand Down Expand Up @@ -206,4 +218,22 @@ private double computeEntropy(Dataset<Long> fileSizes) {
// Normalize.
return mse / fileSizes.count();
}

private Tuple2<Long, Integer> computeFileStats(Dataset<FileStat> files, FileContent content) {
Dataset<FileStat> filesOfContent =
files.filter((FilterFunction<FileStat>) file -> file.getContent() == content.id());

if (filesOfContent.count() == 0) {
return new Tuple2<>(0L, 0);
}

return filesOfContent
.map((MapFunction<FileStat, Long>) FileStat::getSizeInBytes, Encoders.LONG())
.map(
(MapFunction<Long, Tuple2<Long, Integer>>) size -> new Tuple2<>(size, 1),
Encoders.tuple(Encoders.LONG(), Encoders.INT()))
.reduce(
(ReduceFunction<Tuple2<Long, Integer>>)
(a, b) -> new Tuple2<>(a._1 + b._1, a._2 + b._2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public class DataLayoutStrategy {
// TODO: support sorting config
private final String partitionId;
private final String partitionColumns;
private final double posDeleteFileCount;
private final double eqDeleteFileCount;
private final double posDeleteFileBytes;
private final double eqDeleteFileBytes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void testNonPartitionedTableFileStats() throws Exception {
TableFileStats.builder().spark(spark).tableName(testTable).build();
Map<String, Long> stats =
tableFileStats.get().collectAsList().stream()
.collect(Collectors.toMap(FileStat::getPath, FileStat::getSize));
.collect(Collectors.toMap(FileStat::getPath, FileStat::getSizeInBytes));
FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
Path tableDirectory =
new Path(
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testPartitionedTableFileStats() throws Exception {
String idPartitionValue = fileStat.getPartitionValues().get(1);
String folder = "data" + "/ts_day=" + tsPartitionValue + "/id=" + idPartitionValue;
FileStatus fileStatus = fs.listStatus(new Path(tableDirectory, folder))[0];
Assertions.assertEquals(fileStatus.getLen(), fileStat.getSize());
Assertions.assertEquals(fileStatus.getLen(), fileStat.getSizeInBytes());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ void testTableLevelStrategySanityCheck() throws Exception {
Assertions.assertNull(strategy.getPartitionColumns());
// few groups, expect 1 commit
Assertions.assertEquals(1, strategy.getConfig().getPartialProgressMaxCommits());
Assertions.assertEquals(
0, strategy.getPosDeleteFileCount(), "Table should have 0 position delete files");
Assertions.assertEquals(
0, strategy.getEqDeleteFileCount(), "Table should have 0 equality delete files");
Assertions.assertTrue(strategy.getConfig().isPartialProgressEnabled());

Assertions.assertTrue(
strategy.getGain() == 5, "Gain for 6 files compaction in 2 partitions should be 5");
Assertions.assertTrue(
Expand Down Expand Up @@ -105,6 +108,10 @@ void testPartitionLevelStrategySanityCheck() throws Exception {
// few groups, expect 1 commit
Assertions.assertEquals(1, strategy.getConfig().getPartialProgressMaxCommits());
Assertions.assertTrue(strategy.getConfig().isPartialProgressEnabled());
Assertions.assertEquals(
0, strategy.getPosDeleteFileCount(), "Table should have 0 position delete files");
Assertions.assertEquals(
0, strategy.getEqDeleteFileCount(), "Table should have 0 equality delete files");
Assertions.assertTrue(
strategy.getGain() == 2, "Gain for 3 files compaction in 1 partitions should be 2");
Assertions.assertTrue(
Expand Down

0 comments on commit 5867db2

Please sign in to comment.