Skip to content

Commit

Permalink
Merge branch 'master' into fix_table_sync_for_external_catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jzjsnow authored Feb 26, 2025
2 parents d7fd6f3 + 2ff66e5 commit 4259e5d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -75,8 +78,10 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -650,7 +655,10 @@ private Set<String> deleteInvalidMetadataFile(
}

CloseableIterable<FileEntry> fileScan(
Table table, Expression dataFilter, DataExpirationConfig expirationConfig) {
Table table,
Expression dataFilter,
DataExpirationConfig expirationConfig,
long expireTimestamp) {
TableScan tableScan = table.newScan().filter(dataFilter).includeColumnStats();

CloseableIterable<FileScanTask> tasks;
Expand Down Expand Up @@ -680,6 +688,7 @@ CloseableIterable<FileEntry> fileScan(
.collect(Collectors.toSet());

Types.NestedField field = table.schema().findField(expirationConfig.getExpirationField());
Comparable<?> expireValue = getExpireValue(expirationConfig, field, expireTimestamp);
return CloseableIterable.transform(
CloseableIterable.withNoopClose(Iterables.concat(dataFiles, deleteFiles)),
contentFile -> {
Expand All @@ -689,7 +698,8 @@ CloseableIterable<FileEntry> fileScan(
field,
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()),
expirationConfig.getNumberDateFormat());
expirationConfig.getNumberDateFormat(),
expireValue);
return new FileEntry(contentFile.copyWithoutStats(), literal);
});
}
Expand All @@ -698,7 +708,8 @@ protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long expireTimestamp) {
Map<StructLike, DataFileFreshness> partitionFreshness = Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, expirationConfig)) {
try (CloseableIterable<FileEntry> entries =
fileScan(table, dataFilter, expirationConfig, expireTimestamp)) {
Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
entries.forEach(
e -> {
Expand All @@ -716,6 +727,33 @@ protected ExpireFiles expiredFileScan(
return expiredFiles;
}

private Comparable<?> getExpireValue(
DataExpirationConfig expirationConfig, Types.NestedField field, long expireTimestamp) {
switch (field.type().typeId()) {
// expireTimestamp is in milliseconds, TIMESTAMP type is in microseconds
case TIMESTAMP:
return expireTimestamp * 1000;
case LONG:
if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_MS)) {
return expireTimestamp;
} else if (expirationConfig.getNumberDateFormat().equals(EXPIRE_TIMESTAMP_S)) {
return expireTimestamp / 1000;
} else {
throw new IllegalArgumentException(
"Number dateformat: " + expirationConfig.getNumberDateFormat());
}
case STRING:
return LocalDateTime.ofInstant(
Instant.ofEpochMilli(expireTimestamp), getDefaultZoneId(field))
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(), Locale.getDefault()));
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
}
}

/**
* Create a filter expression for expired files for the `FILE` level. For the `PARTITION` level,
* we need to collect the oldest files to determine if the partition is obsolete, so we will not
Expand Down Expand Up @@ -917,17 +955,20 @@ static boolean willNotRetain(
}
}

private static Literal<Long> getExpireTimestampLiteral(
private Literal<Long> getExpireTimestampLiteral(
ContentFile<?> contentFile,
Types.NestedField field,
DateTimeFormatter formatter,
String numberDateFormatter) {
String numberDateFormatter,
Comparable<?> expireValue) {
Type type = field.type();
Object upperBound =
Conversions.fromByteBuffer(type, contentFile.upperBounds().get(field.fieldId()));
Literal<Long> literal = Literal.of(Long.MAX_VALUE);
if (null == upperBound) {
return literal;
if (canBeExpireByPartitionValue(contentFile, field, expireValue)) {
literal = Literal.of(0L);
}
} else if (upperBound instanceof Long) {
switch (type.typeId()) {
case TIMESTAMP:
Expand All @@ -951,9 +992,40 @@ private static Literal<Long> getExpireTimestampLiteral(
.toInstant()
.toEpochMilli());
}

return literal;
}

@SuppressWarnings("unchecked")
private boolean canBeExpireByPartitionValue(
ContentFile<?> contentFile, Types.NestedField expireField, Comparable<?> expireValue) {
PartitionSpec partitionSpec = table.specs().get(contentFile.specId());
int pos = 0;
List<Boolean> compareResults = new ArrayList<>();
for (PartitionField partitionField : partitionSpec.fields()) {
if (partitionField.sourceId() == expireField.fieldId()) {
if (partitionField.transform().isVoid()) {
return false;
}

Comparable<?> partitionUpperBound =
((SerializableFunction<Comparable<?>, Comparable<?>>)
partitionField.transform().bind(expireField.type()))
.apply(expireValue);
Comparable<Object> filePartitionValue =
contentFile.partition().get(pos, partitionUpperBound.getClass());
int compared = filePartitionValue.compareTo(partitionUpperBound);
Boolean compareResult =
expireField.type() == Types.StringType.get() ? compared <= 0 : compared < 0;
compareResults.add(compareResult);
}

pos++;
}

return !compareResults.isEmpty() && compareResults.stream().allMatch(Boolean::booleanValue);
}

public Table getTable() {
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ public void expireDataFrom(DataExpirationConfig expirationConfig, Instant instan

CloseableIterable<MixedFileEntry> changeEntries =
CloseableIterable.transform(
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig),
changeMaintainer.fileScan(changeTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), true));
CloseableIterable<MixedFileEntry> baseEntries =
CloseableIterable.transform(
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig),
baseMaintainer.fileScan(baseTable, dataFilter, expirationConfig, expireTimestamp),
e -> new MixedFileEntry(e.getFile(), e.getTsBound(), false));
IcebergTableMaintainer.ExpireFiles changeExpiredFiles =
new IcebergTableMaintainer.ExpireFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.amoro.BasicTableTestHelper.PRIMARY_KEY_SPEC;
import static org.apache.amoro.BasicTableTestHelper.SPEC;
import static org.junit.Assume.assumeTrue;

import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -194,6 +196,7 @@ private void testUnKeyedPartitionLevel() {

List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
// retention time is 1 day, expire partitions that order than 2022-01-02
if (expireByStringDate()) {
expected =
Lists.newArrayList(
Expand Down Expand Up @@ -464,6 +467,20 @@ public void testNormalFieldFileLevel() {
testFileLevel();
}

@Test
public void testExpireByPartitionWhenMetricsModeIsNone() {
assumeTrue(getMixedTable().format().in(TableFormat.MIXED_ICEBERG, TableFormat.ICEBERG));

getMixedTable()
.updateProperties()
.set(
org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE,
MetricsModes.None.get().toString())
.commit();

testPartitionLevel();
}

@Test
public void testGcDisabled() {
MixedTable testTable = getMixedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public GenericBaseTaskWriter buildBaseWriter() {
- 1;
return new GenericBaseTaskWriter(
fileFormat,
new GenericAppenderFactory(base.schema(), table.spec()),
new GenericAppenderFactory(base.schema(), table.spec()).setAll(table.properties()),
new CommonOutputFileFactory(
base.location(),
table.spec(),
Expand Down Expand Up @@ -161,7 +161,7 @@ public SortedPosDeleteWriter<Record> buildBasePosDeleteWriter(
TableProperties.BASE_FILE_FORMAT, TableProperties.BASE_FILE_FORMAT_DEFAULT)
.toUpperCase(Locale.ENGLISH)));
GenericAppenderFactory appenderFactory =
new GenericAppenderFactory(base.schema(), table.spec());
new GenericAppenderFactory(base.schema(), table.spec()).setAll(table.properties());
appenderFactory.set(
org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX
+ MetadataColumns.DELETE_FILE_PATH.name(),
Expand Down Expand Up @@ -219,7 +219,7 @@ public GenericChangeTaskWriter buildChangeWriter() {
Schema changeWriteSchema = SchemaUtil.changeWriteSchema(change.schema());
return new GenericChangeTaskWriter(
fileFormat,
new GenericAppenderFactory(changeWriteSchema, table.spec()),
new GenericAppenderFactory(changeWriteSchema, table.spec()).setAll(table.properties()),
new CommonOutputFileFactory(
change.location(),
table.spec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testStats() {
.matches(
"VALUES "
+ "('id', NULL, NULL, 0e0, NULL, '1', '4'), "
+ "('name$name', 548e0, NULL, 0e0, NULL, NULL, NULL), "
+ "('name$name', 429e0, NULL, 0e0, NULL, NULL, NULL), "
+ "('op_time', NULL, NULL, 0e0, NULL, '2022-01-01 12:00:00.000000', '2022-01-04 12:00:00.000000'), "
+ "(NULL, NULL, NULL, NULL, 4e0, NULL, NULL)");
}
Expand Down

0 comments on commit 4259e5d

Please sign in to comment.