Skip to content

Commit

Permalink
HIVE-27771: Iceberg: Allow expire snapshot by time range. (#4776). (A…
Browse files Browse the repository at this point in the history
…yush Saxena, reviewed by Denys Kuzmenko)
  • Loading branch information
ayushtkn authored Oct 9, 2023
1 parent 20be17d commit dec006e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,10 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec.isExpireByIds()) {
if (expireSnapshotsSpec.isExpireByTimestampRange()) {
expireSnapshotByTimestampRange(icebergTable, expireSnapshotsSpec.getFromTimestampMillis(),
expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
} else if (expireSnapshotsSpec.isExpireByIds()) {
expireSnapshotByIds(icebergTable, expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService);
} else {
expireSnapshotOlderThanTimestamp(icebergTable, expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService);
Expand All @@ -846,6 +849,23 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap
}
}

private void expireSnapshotByTimestampRange(Table icebergTable, Long fromTimestamp, Long toTimestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
for (Snapshot snapshot : icebergTable.snapshots()) {
if (snapshot.timestampMillis() >= fromTimestamp && snapshot.timestampMillis() <= toTimestamp) {
expireSnapshots.expireSnapshotId(snapshot.snapshotId());
LOG.debug("Expiring snapshot on {} with id: {} and timestamp: {}", icebergTable.name(), snapshot.snapshotId(),
snapshot.timestampMillis());
}
}
LOG.info("Expiring snapshot on {} within time range {} -> {}", icebergTable.name(), fromTimestamp, toTimestamp);
if (deleteExecutorService != null) {
expireSnapshots.executeDeleteWith(deleteExecutorService);
}
expireSnapshots.commit();
}

private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long timestamp,
ExecutorService deleteExecutorService) {
ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots().expireOlderThan(timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.commons.collections4.IterableUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -65,4 +67,19 @@ public void testExpireSnapshotsWithSnapshotId() throws IOException, InterruptedE
table.refresh();
Assert.assertEquals(7, IterableUtils.size(table.snapshots()));
}

@Test
public void testExpireSnapshotsWithTimestampRange() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");
String fromTime = simpleDateFormat.format(new Date(table.history().get(5).timestampMillis()));
String toTime = simpleDateFormat.format(new Date(table.history().get(8).timestampMillis()));
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN" +
" '" + fromTime + "' AND '" + toTime + "'");
table.refresh();
Assert.assertEquals(6, IterableUtils.size(table.snapshots()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
| KW_EXECUTE KW_CHERRY_PICK snapshotId=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral) KW_AND (toTimestamp=StringLiteral)
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp)
;

alterStatementSuffixDropBranch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.hive.ql.session.SessionState;

import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -79,7 +81,8 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
desc = getRollbackDesc(tableName, partitionSpec, (ASTNode) command.getChild(1));
break;
case HiveParser.KW_EXPIRE_SNAPSHOTS:
desc = getExpireSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1));
desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren());

break;
case HiveParser.KW_SET_CURRENT_SNAPSHOT:
desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1));
Expand Down Expand Up @@ -130,18 +133,25 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa
}

private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map<String, String> partitionSpec,
ASTNode childNode) throws SemanticException {
List<Node> children) throws SemanticException {
AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;
// the second child must be the rollback parameter

ZoneId timeZone = SessionState.get() == null ?
new HiveConf().getLocalTimeZone() :
SessionState.get().getConf().getLocalTimeZone();
String childText = PlanUtils.stripQuotes(childNode.getText().trim());
if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(childText).matches()) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(childText));
ASTNode firstNode = (ASTNode) children.get(1);
String firstNodeText = PlanUtils.stripQuotes(firstNode.getText().trim());
if (children.size() == 3) {
ASTNode secondNode = (ASTNode) children.get(2);
String secondNodeText = PlanUtils.stripQuotes(secondNode.getText().trim());
TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone);
TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT,
new ExpireSnapshotsSpec(fromTime.toEpochMilli(), toTime.toEpochMilli()));
} else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(firstNodeText));
} else {
TimestampTZ time = TimestampTZUtil.parse(childText, timeZone);
TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli()));
}
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public static class ExpireSnapshotsSpec {
private long timestampMillis = -1L;
private String[] idsToExpire = null;

private long fromTimestampMillis = -1L;

public ExpireSnapshotsSpec(long timestampMillis) {
this.timestampMillis = timestampMillis;
}
Expand All @@ -117,10 +119,19 @@ public ExpireSnapshotsSpec(String ids) {
this.idsToExpire = ids.split(",");
}

public ExpireSnapshotsSpec(long fromTimestampMillis, long toTimestampMillis) {
this.fromTimestampMillis = fromTimestampMillis;
this.timestampMillis = toTimestampMillis;
}

public Long getTimestampMillis() {
return timestampMillis;
}

public Long getFromTimestampMillis() {
return fromTimestampMillis;
}

public String[] getIdsToExpire() {
return idsToExpire;
}
Expand All @@ -129,10 +140,16 @@ public boolean isExpireByIds() {
return idsToExpire != null;
}

public boolean isExpireByTimestampRange() {
return timestampMillis != -1 && fromTimestampMillis != -1;
}

@Override
public String toString() {
MoreObjects.ToStringHelper stringHelper = MoreObjects.toStringHelper(this);
if (isExpireByIds()) {
if (isExpireByTimestampRange()) {
stringHelper.add("fromTimestampMillis", fromTimestampMillis).add("toTimestampMillis", timestampMillis);
} else if (isExpireByIds()) {
stringHelper.add("idsToExpire", Arrays.toString(idsToExpire));
} else {
stringHelper.add("timestampMillis", timestampMillis);
Expand Down

0 comments on commit dec006e

Please sign in to comment.