diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index e24709058bef..fd2a2f9d1d07 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -1055,6 +1055,38 @@ public void testFindReadyToCleanAborts_limitFetchSize() throws Exception { assertEquals(1, potentials.size()); } + @Test + public void testFindReadyToCleanAborts() throws Exception { + long txnId = openTxn(); + + List components = new ArrayList<>(); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE)); + + allocateTableWriteIds("mydb", "mytable", txnId); + allocateTableWriteIds("mydb", "yourtable", txnId); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnId); + LockResponse res = txnHandler.lock(req); + assertSame(res.getState(), LockState.ACQUIRED); + + txnHandler.abortTxn(new AbortTxnRequest((txnId))); + + txnId = openTxn(); + components = new ArrayList<>(); + components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE)); + allocateTableWriteIds("mydb", "mytable", txnId); + + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnId); + res = txnHandler.lock(req); + assertSame(res.getState(), LockState.ACQUIRED); + + List potentials = txnHandler.findReadyToCleanAborts(1, 0); + assertEquals(1, potentials.size()); + } + private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { FindNextCompactRequest request = new FindNextCompactRequest(); request.setWorkerId(workerId); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index f224c7b5e278..5df0e028322f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -104,13 +104,13 @@ class CompactionTxnHandler extends TxnHandler { " FROM " + // First sub-query - Gets the aborted txns with min txn start time, number of aborted txns // for corresponding db, table, partition. - " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " + + " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", MAX(\"TC_WRITEID\") AS \"MAX_ABORTED_WRITE_ID\", " + " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " + " LEFT JOIN" + // Second sub-query - Gets the min open txn id for corresponding db, table, partition. - "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " + + "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\", MAX(\"TC_WRITEID\") AS \"MAX_OPEN_WRITE_ID\" " + " FROM \"TXNS\", \"TXN_COMPONENTS\" " + " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.OPEN + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"res2\"" + @@ -129,7 +129,8 @@ class CompactionTxnHandler extends TxnHandler { " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " + " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " + " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" + - " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL"; + " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL" + + " AND (\"res2\".\"MAX_OPEN_WRITE_ID\" IS NULL OR \"res2\".\"MAX_OPEN_WRITE_ID\" <= \"res1\".\"MAX_ABORTED_WRITE_ID\")"; private static final String DELETE_CQ_ENTRIES = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";