From dec006e81e657bc6edb0d57d31076f04e1f088c5 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 9 Oct 2023 16:48:11 +0530 Subject: [PATCH] HIVE-27771: Iceberg: Allow expire snapshot by time range. (#4776). (Ayush Saxena, reviewed by Denys Kuzmenko) --- .../mr/hive/HiveIcebergStorageHandler.java | 22 ++++++++++++++++- .../hive/TestHiveIcebergExpireSnapshots.java | 17 +++++++++++++ .../hadoop/hive/ql/parse/AlterClauseParser.g | 2 ++ .../execute/AlterTableExecuteAnalyzer.java | 24 +++++++++++++------ .../hive/ql/parse/AlterTableExecuteSpec.java | 19 ++++++++++++++- 5 files changed, 75 insertions(+), 9 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index bb9356df2517..4e451403b028 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -834,7 +834,10 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads); } - if (expireSnapshotsSpec.isExpireByIds()) { + if (expireSnapshotsSpec.isExpireByTimestampRange()) { + expireSnapshotByTimestampRange(icebergTable, expireSnapshotsSpec.getFromTimestampMillis(), + expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService); + } else if (expireSnapshotsSpec.isExpireByIds()) { expireSnapshotByIds(icebergTable, expireSnapshotsSpec.getIdsToExpire(), deleteExecutorService); } else { expireSnapshotOlderThanTimestamp(icebergTable, expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService); @@ -846,6 +849,23 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap } } + private void expireSnapshotByTimestampRange(Table icebergTable, Long fromTimestamp, Long toTimestamp, + ExecutorService deleteExecutorService) { + ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); + for (Snapshot snapshot : icebergTable.snapshots()) { + if (snapshot.timestampMillis() >= fromTimestamp && snapshot.timestampMillis() <= toTimestamp) { + expireSnapshots.expireSnapshotId(snapshot.snapshotId()); + LOG.debug("Expiring snapshot on {} with id: {} and timestamp: {}", icebergTable.name(), snapshot.snapshotId(), + snapshot.timestampMillis()); + } + } + LOG.info("Expiring snapshot on {} within time range {} -> {}", icebergTable.name(), fromTimestamp, toTimestamp); + if (deleteExecutorService != null) { + expireSnapshots.executeDeleteWith(deleteExecutorService); + } + expireSnapshots.commit(); + } + private void expireSnapshotOlderThanTimestamp(Table icebergTable, Long timestamp, ExecutorService deleteExecutorService) { ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots().expireOlderThan(timestamp); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 08277fb7892e..a851578ee6c6 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -20,6 +20,8 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; import org.apache.commons.collections4.IterableUtils; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -65,4 +67,19 @@ public void testExpireSnapshotsWithSnapshotId() throws IOException, InterruptedE table.refresh(); Assert.assertEquals(7, IterableUtils.size(table.snapshots())); } + + @Test + public void testExpireSnapshotsWithTimestampRange() throws IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "source"); + Table table = testTables.createTableWithVersions(shell, identifier.name(), + HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000"); + String fromTime = simpleDateFormat.format(new Date(table.history().get(5).timestampMillis())); + String toTime = simpleDateFormat.format(new Date(table.history().get(8).timestampMillis())); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS BETWEEN" + + " '" + fromTime + "' AND '" + toTime + "'"); + table.refresh(); + Assert.assertEquals(6, IterableUtils.size(table.snapshots())); + } } diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 48a2aed96173..46a00fe5c873 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -483,6 +483,8 @@ alterStatementSuffixExecute -> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?) | KW_EXECUTE KW_CHERRY_PICK snapshotId=Number -> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId) + | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral) KW_AND (toTimestamp=StringLiteral) + -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp) ; alterStatementSuffixDropBranch diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java index 81ed88849df6..ddc12935700d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import java.time.ZoneId; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -79,7 +81,8 @@ protected void analyzeCommand(TableName tableName, Map partition desc = getRollbackDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); break; case HiveParser.KW_EXPIRE_SNAPSHOTS: - desc = getExpireSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); + desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren()); + break; case HiveParser.KW_SET_CURRENT_SNAPSHOT: desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); @@ -130,18 +133,25 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa } private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map partitionSpec, - ASTNode childNode) throws SemanticException { + List children) throws SemanticException { AlterTableExecuteSpec spec; - // the second child must be the rollback parameter ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() : SessionState.get().getConf().getLocalTimeZone(); - String childText = PlanUtils.stripQuotes(childNode.getText().trim()); - if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(childText).matches()) { - spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(childText)); + ASTNode firstNode = (ASTNode) children.get(1); + String firstNodeText = PlanUtils.stripQuotes(firstNode.getText().trim()); + if (children.size() == 3) { + ASTNode secondNode = (ASTNode) children.get(2); + String secondNodeText = PlanUtils.stripQuotes(secondNode.getText().trim()); + TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone); + TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone); + spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, + new ExpireSnapshotsSpec(fromTime.toEpochMilli(), toTime.toEpochMilli())); + } else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) { + spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(firstNodeText)); } else { - TimestampTZ time = TimestampTZUtil.parse(childText, timeZone); + TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone); spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new ExpireSnapshotsSpec(time.toEpochMilli())); } return new AlterTableExecuteDesc(tableName, partitionSpec, spec); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java index fefeb267c290..c469b24415f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java @@ -109,6 +109,8 @@ public static class ExpireSnapshotsSpec { private long timestampMillis = -1L; private String[] idsToExpire = null; + private long fromTimestampMillis = -1L; + public ExpireSnapshotsSpec(long timestampMillis) { this.timestampMillis = timestampMillis; } @@ -117,10 +119,19 @@ public ExpireSnapshotsSpec(String ids) { this.idsToExpire = ids.split(","); } + public ExpireSnapshotsSpec(long fromTimestampMillis, long toTimestampMillis) { + this.fromTimestampMillis = fromTimestampMillis; + this.timestampMillis = toTimestampMillis; + } + public Long getTimestampMillis() { return timestampMillis; } + public Long getFromTimestampMillis() { + return fromTimestampMillis; + } + public String[] getIdsToExpire() { return idsToExpire; } @@ -129,10 +140,16 @@ public boolean isExpireByIds() { return idsToExpire != null; } + public boolean isExpireByTimestampRange() { + return timestampMillis != -1 && fromTimestampMillis != -1; + } + @Override public String toString() { MoreObjects.ToStringHelper stringHelper = MoreObjects.toStringHelper(this); - if (isExpireByIds()) { + if (isExpireByTimestampRange()) { + stringHelper.add("fromTimestampMillis", fromTimestampMillis).add("toTimestampMillis", timestampMillis); + } else if (isExpireByIds()) { stringHelper.add("idsToExpire", Arrays.toString(idsToExpire)); } else { stringHelper.add("timestampMillis", timestampMillis);