Skip to content

Commit

Permalink
HIVE-27637: Compare highest write ID of compaction records when tryin…
Browse files Browse the repository at this point in the history
…g to perform abort cleanup
  • Loading branch information
InvisibleProgrammer committed Sep 22, 2023
1 parent 6c35cc4 commit 545e0da
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,38 @@ public void testFindReadyToCleanAborts_limitFetchSize() throws Exception {
assertEquals(1, potentials.size());
}

@Test
public void testFindReadyToCleanAborts() throws Exception {
long txnId = openTxn();

List<LockComponent> components = new ArrayList<>();
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE));

allocateTableWriteIds("mydb", "mytable", txnId);
allocateTableWriteIds("mydb", "yourtable", txnId);

LockRequest req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnId);
LockResponse res = txnHandler.lock(req);
assertSame(res.getState(), LockState.ACQUIRED);

txnHandler.abortTxn(new AbortTxnRequest((txnId)));

txnId = openTxn();
components = new ArrayList<>();
components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
allocateTableWriteIds("mydb", "mytable", txnId);

req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnId);
res = txnHandler.lock(req);
assertSame(res.getState(), LockState.ACQUIRED);

List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0);
assertEquals(1, potentials.size());
}

private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
FindNextCompactRequest request = new FindNextCompactRequest();
request.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ class CompactionTxnHandler extends TxnHandler {
" FROM " +
// First sub-query - Gets the aborted txns with min txn start time, number of aborted txns
// for corresponding db, table, partition.
" ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
" ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", MAX(\"TC_WRITEID\") AS \"MAX_ABORTED_WRITE_ID\", " +
" COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
" WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED +
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " +
" LEFT JOIN" +
// Second sub-query - Gets the min open txn id for corresponding db, table, partition.
"( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " +
"( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\", MAX(\"TC_WRITEID\") AS \"MAX_OPEN_WRITE_ID\" " +
" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
" WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.OPEN +
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"res2\"" +
Expand All @@ -129,7 +129,8 @@ class CompactionTxnHandler extends TxnHandler {
" AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
" AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
" OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" +
" WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
" WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL" +
" AND (\"res2\".\"MAX_OPEN_WRITE_ID\" IS NULL OR \"res2\".\"MAX_OPEN_WRITE_ID\" <= \"res1\".\"MAX_ABORTED_WRITE_ID\")";

private static final String DELETE_CQ_ENTRIES = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";

Expand Down

0 comments on commit 545e0da

Please sign in to comment.