Skip to content

Commit

Permalink
HIVE-28190: Fix MaterializationRebuild lock heartbeat (Zsolt Miskolcz…
Browse files Browse the repository at this point in the history
…i, reviewed by Attila Turoczy, Denys Kuzmenko, Krisztian Kasa, Zoltan Ratkai)

Closes #5186
  • Loading branch information
InvisibleProgrammer authored Apr 22, 2024
1 parent 2134e3d commit 1a969f6
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore.txn;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;

public class TestHeartbeatTxnRangeFunction {

private static final HiveConf conf = new HiveConf();
private static TxnStore txnHandler;

@BeforeClass
public static void setUp() throws Exception {
TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.prepDb(conf);
txnHandler = TxnUtils.getTxnStore(conf);
}

@After
public void tearDown() throws Exception {
TestTxnDbUtil.cleanDb(conf);
}

@Test
public void testHeartbeatTxnRangeFunction_NoSuchTxn() throws MetaException {
HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, 1L);

HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request);

assertEquals(1, response.getNosuchSize());
}

@Test
public void testHeartbeatTxnRangeFunction_AbortedTxn() throws MetaException, TxnAbortedException, NoSuchTxnException {
openTxn();
openTxn();
long txnId = openTxn();

txnHandler.abortTxn(new AbortTxnRequest(txnId));
HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, txnId);

HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request);

assertEquals(1, response.getAbortedSize());
Long txn = response.getAbortedIterator().next();
assertEquals(3L, (long)txn);
assertEquals(0, response.getNosuch().size());
}

@Test
public void testHeartbeatTxnRangeFunction_Success() throws Exception {
openTxn();
openTxn();
long txnId = openTxn();

String firstHeartbeat = TestTxnDbUtil.queryToString(conf, "select \"TXN_LAST_HEARTBEAT\" from \"TXNS\" where \"TXN_ID\" = " + txnId, false);
HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, txnId);

HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request);
String updatedHeartbeat = TestTxnDbUtil.queryToString(conf, "select \"TXN_LAST_HEARTBEAT\" from \"TXNS\" where \"TXN_ID\" = " + txnId, false);

assertEquals(0, response.getAbortedSize());
assertEquals(0, response.getNosuchSize());
assertNotNull(firstHeartbeat);
assertNotNull(updatedHeartbeat);

assertNotEquals(firstHeartbeat, updatedHeartbeat);
}

@Test
public void testHeartbeatTxnRangeOneCommitted_Mixed() throws Exception {
openTxn();
txnHandler.commitTxn(new CommitTxnRequest(1));
openTxn();
long txnId = openTxn();

HeartbeatTxnRangeResponse rsp =
txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, txnId));

assertEquals(1, rsp.getNosuchSize());
long noSuchTxnId = rsp.getNosuchIterator().next();
assertEquals(1L, noSuchTxnId);

assertEquals(0, rsp.getAbortedSize());
}

private long openTxn() throws MetaException {
List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
return txns.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
Expand Down Expand Up @@ -1177,48 +1175,6 @@ public void testHeartbeatLock() throws Exception {
}
}

@Test
public void heartbeatTxnRange() throws Exception {
long txnid = openTxn();
assertEquals(1, txnid);
txnid = openTxn();
txnid = openTxn();
HeartbeatTxnRangeResponse rsp =
txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
assertEquals(0, rsp.getAborted().size());
assertEquals(0, rsp.getNosuch().size());
}

@Test
public void heartbeatTxnRangeOneCommitted() throws Exception {
long txnid = openTxn();
assertEquals(1, txnid);
txnHandler.commitTxn(new CommitTxnRequest(1));
txnid = openTxn();
txnid = openTxn();
HeartbeatTxnRangeResponse rsp =
txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
assertEquals(1, rsp.getNosuchSize());
Long txn = rsp.getNosuch().iterator().next();
assertEquals(1L, (long)txn);
assertEquals(0, rsp.getAborted().size());
}

@Test
public void heartbeatTxnRangeOneAborted() throws Exception {
long txnid = openTxn();
assertEquals(1, txnid);
txnid = openTxn();
txnid = openTxn();
txnHandler.abortTxn(new AbortTxnRequest(3));
HeartbeatTxnRangeResponse rsp =
txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
assertEquals(1, rsp.getAbortedSize());
Long txn = rsp.getAborted().iterator().next();
assertEquals(3L, (long)txn);
assertEquals(0, rsp.getNosuch().size());
}

@Test
public void testLockTimeout() throws Exception {
long timeout = txnHandler.setTimeout(1);
Expand Down Expand Up @@ -1828,6 +1784,15 @@ public void testGetMaterializationInvalidationInfoWithValidReaderWriteIdListWhen
);
}

@Test
public void testHeartbeatLockMaterializationRebuild() throws MetaException {
txnHandler.lockMaterializationRebuild("default", "table1", 1L);

boolean result = txnHandler.heartbeatLockMaterializationRebuild("default", "table1", 1L);

assertTrue(result);
}

private void testGetMaterializationInvalidationInfoWithValidReaderWriteIdList(
ValidReadTxnList currentValidTxnList, ValidReaderWriteIdList... tableWriteIdList) throws MetaException {
ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(5L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;

import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;

/**
Expand Down Expand Up @@ -704,16 +705,16 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName,
@Override
public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws MetaException {
int result = jdbcResource.execute(
"UPDATE \"MATERIALIZATION_REBUILD_LOCKS\"" +
" SET \"MRL_LAST_HEARTBEAT\" = " + Instant.now().toEpochMilli() +
" WHERE \"MRL_TXN_ID\" = " + txnId +
" AND \"MRL_DB_NAME\" = ?" +
" AND \"MRL_TBL_NAME\" = ?",
new MapSqlParameterSource()
.addValue("now", Instant.now().toEpochMilli())
.addValue("txnId", txnId)
.addValue("dbName", dbName)
.addValue("tableNane", tableName),
"UPDATE \"MATERIALIZATION_REBUILD_LOCKS\"" +
" SET \"MRL_LAST_HEARTBEAT\" = :lastHeartbeat" +
" WHERE \"MRL_TXN_ID\" = :txnId" +
" AND \"MRL_DB_NAME\" = :dbName" +
" AND \"MRL_TBL_NAME\" = :tblName",
new MapSqlParameterSource()
.addValue("lastHeartbeat", Instant.now().toEpochMilli())
.addValue("txnId", txnId)
.addValue("dbName", dbName)
.addValue("tblName", tableName),
ParameterizedCommand.AT_LEAST_ONE_ROW);
return result >= 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public HeartbeatTxnRangeResponse execute(MultiDataSourceJdbcResource jdbcResourc
}
if (updateCnt == numTxnsToHeartbeat) {
//fast pass worked, i.e. all txns we were asked to heartbeat were Open as expected
context.rollbackToSavepoint(savePoint);
context.createSavepoint();
return rsp;
}
//if here, do the slow path so that we can return info txns which were not in expected state
Expand Down

0 comments on commit 1a969f6

Please sign in to comment.