From 2ff66e5d066e4a151051cf05b352ef65825fe308 Mon Sep 17 00:00:00 2001 From: Darcy Date: Wed, 26 Feb 2025 13:50:49 +0800 Subject: [PATCH] [AMORO-3272] data-expire by partition info (#3273) * feature: data-expire by partition info * add comments * Refactor the code to improve readability. * Refactor the code to improve readability. * feature: add test cases * fix test error * optimize code * update * checkstyle --------- Co-authored-by: ZhouJinsong Co-authored-by: Congxian Qiu Co-authored-by: Xavier Bai Co-authored-by: baiyangtx Co-authored-by: Xavier Bai --- .../maintainer/IcebergTableMaintainer.java | 84 +++++++++++++++++-- .../maintainer/MixedTableMaintainer.java | 4 +- .../optimizing/maintainer/TestDataExpire.java | 17 ++++ 3 files changed, 97 insertions(+), 8 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index af9b7c9ddb..272e9f224e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -45,6 +45,8 @@ import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -63,6 +65,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SerializableFunction; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +78,10 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -650,7 +655,10 @@ private Set deleteInvalidMetadataFile( } CloseableIterable fileScan( - Table table, Expression dataFilter, DataExpirationConfig expirationConfig) { + Table table, + Expression dataFilter, + DataExpirationConfig expirationConfig, + long expireTimestamp) { TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats(); CloseableIterable tasks; @@ -680,6 +688,7 @@ CloseableIterable fileScan( .collect(Collectors.toSet()); Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField()); + Comparable expireValue = getExpireValue(expirationConfig, field, expireTimestamp); return CloseableIterable.transform( CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)), contentFile -> { @@ -689,7 +698,8 @@ CloseableIterable fileScan( field, DateTimeFormatter.ofPattern( expirationConfig.getDateTimePattern(), Locale.getDefault()), - expirationConfig.getNumberDateFormat()); + expirationConfig.getNumberDateFormat(), + expireValue); return new FileEntry(contentFile.copyWithoutStats(), literal); }); } @@ -698,7 +708,8 @@ protected ExpireFiles expiredFileScan( DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) { Map partitionFreshness = Maps.newConcurrentMap(); ExpireFiles expiredFiles = new ExpireFiles(); - try (CloseableIterable entries = fileScan(table, dataFilter, expirationConfig)) { + try (CloseableIterable entries = + fileScan(table, dataFilter, expirationConfig, expireTimestamp)) { Queue fileEntries = new LinkedTransferQueue<>(); entries.forEach( e -> { @@ -716,6 +727,33 @@ protected ExpireFiles expiredFileScan( return expiredFiles; } + private Comparable getExpireValue( + DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) { + switch (field.type().typeId()) { + // expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds + case TIMESTAMP: + return expireTimestamp * 1000; + case LONG: + if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) { + return expireTimestamp; + } else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) { + return expireTimestamp / 1000; + } else { + throw new IllegalArgumentException( + "Number dateformat: " + expirationConfig.getNumberDateFormat()); + } + case STRING: + return LocalDateTime.ofInstant( + Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field)) + .format( + DateTimeFormatter.ofPattern( + expirationConfig.getDateTimePattern(), Locale.getDefault())); + default: + throw new IllegalArgumentException( + "Unsupported expiration field type: " + field.type().typeId()); + } + } + /** * Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level, * we need to collect the oldest files to determine if the partition is obsolete, so we will not @@ -917,17 +955,20 @@ static boolean willNotRetain( } } - private static Literal getExpireTimestampLiteral( + private Literal getExpireTimestampLiteral( ContentFile contentFile, Types.NestedField field, DateTimeFormatter formatter, - String numberDateFormatter) { + String numberDateFormatter, + Comparable expireValue) { Type type = field.type(); Object upperBound = Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId())); Literal literal = Literal.of(Long.MAX_VALUE); if (null == upperBound) { - return literal; + if (canBeExpireByPartitionValue(contentFile, field, expireValue)) { + literal = Literal.of(0L); + } } else if (upperBound instanceof Long) { switch (type.typeId()) { case TIMESTAMP: @@ -951,9 +992,40 @@ private static Literal getExpireTimestampLiteral( .toInstant() .toEpochMilli()); } + return literal; } + @SuppressWarnings("unchecked") + private boolean canBeExpireByPartitionValue( + ContentFile contentFile, Types.NestedField expireField, Comparable expireValue) { + PartitionSpec partitionSpec = table.specs().get(contentFile.specId()); + int pos = 0; + List compareResults = new ArrayList<>(); + for (PartitionField partitionField : partitionSpec.fields()) { + if (partitionField.sourceId() == expireField.fieldId()) { + if (partitionField.transform().isVoid()) { + return false; + } + + Comparable partitionUpperBound = + ((SerializableFunction, Comparable>) + partitionField.transform().bind(expireField.type())) + .apply(expireValue); + Comparable filePartitionValue = + contentFile.partition().get(pos, partitionUpperBound.getClass()); + int compared = filePartitionValue.compareTo(partitionUpperBound); + Boolean compareResult = + expireField.type() == Types.StringType.get() ? compared <= 0 : compared < 0; + compareResults.add(compareResult); + } + + pos++; + } + + return !compareResults.isEmpty() && compareResults.stream().allMatch(Boolean::booleanValue); + } + public Table getTable() { return table; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java index 1b5c8cbd82..c36a870da3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java @@ -195,11 +195,11 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan CloseableIterable changeEntries = CloseableIterable.transform( - changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig), + changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig, expireTimestamp), e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true)); CloseableIterable baseEntries = CloseableIterable.transform( - baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig), + baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig, expireTimestamp), e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false)); IcebergTableMaintainer.ExpireFiles changeExpiredFiles = new IcebergTableMaintainer.ExpireFiles(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java index ec41558712..f54c81decc 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java @@ -20,6 +20,7 @@ import static org.apache.amoro.BasicTableTestHelper.PRIMARY_KEY_SPEC; import static org.apache.amoro.BasicTableTestHelper.SPEC; +import static org.junit.Assume.assumeTrue; import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.TableFormat; @@ -49,6 +50,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetricsModes; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -194,6 +196,7 @@ private void testUnKeyedPartitionLevel() { List expected; if (tableTestHelper().partitionSpec().isPartitioned()) { + // retention time is 1 day, expire partitions that order than 2022-01-02 if (expireByStringDate()) { expected = Lists.newArrayList( @@ -464,6 +467,20 @@ public void testNormalFieldFileLevel() { testFileLevel(); } + @Test + public void testExpireByPartitionWhenMetricsModeIsNone() { + assumeTrue(getMixedTable().format().in(TableFormat.MIXED_ICEBERG, TableFormat.ICEBERG)); + + getMixedTable() + .updateProperties() + .set( + org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE, + MetricsModes.None.get().toString()) + .commit(); + + testPartitionLevel(); + } + @Test public void testGcDisabled() { MixedTable testTable = getMixedTable();