diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestHeartbeatTxnRangeFunction.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestHeartbeatTxnRangeFunction.java new file mode 100644 index 000000000000..943331aba52a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestHeartbeatTxnRangeFunction.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; + +public class TestHeartbeatTxnRangeFunction { + + private static final HiveConf conf = new HiveConf(); + private static TxnStore txnHandler; + + @BeforeClass + public static void setUp() throws Exception { + TestTxnDbUtil.setConfValues(conf); + TestTxnDbUtil.prepDb(conf); + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TestTxnDbUtil.cleanDb(conf); + } + + @Test + public void testHeartbeatTxnRangeFunction_NoSuchTxn() throws MetaException { + HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, 1L); + + HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request); + + assertEquals(1, response.getNosuchSize()); + } + + @Test + public void testHeartbeatTxnRangeFunction_AbortedTxn() throws MetaException, TxnAbortedException, NoSuchTxnException { + openTxn(); + openTxn(); + long txnId = openTxn(); + + txnHandler.abortTxn(new AbortTxnRequest(txnId)); + HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, txnId); + + HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request); + + assertEquals(1, response.getAbortedSize()); + Long txn = response.getAbortedIterator().next(); + assertEquals(3L, (long)txn); + assertEquals(0, response.getNosuch().size()); + } + + @Test + public void testHeartbeatTxnRangeFunction_Success() throws Exception { + openTxn(); + openTxn(); + long txnId = openTxn(); + + String firstHeartbeat = TestTxnDbUtil.queryToString(conf, "select \"TXN_LAST_HEARTBEAT\" from \"TXNS\" where \"TXN_ID\" = " + txnId, false); + HeartbeatTxnRangeRequest request = new HeartbeatTxnRangeRequest(1L, txnId); + + HeartbeatTxnRangeResponse response = txnHandler.heartbeatTxnRange(request); + String updatedHeartbeat = TestTxnDbUtil.queryToString(conf, "select \"TXN_LAST_HEARTBEAT\" from \"TXNS\" where \"TXN_ID\" = " + txnId, false); + + assertEquals(0, response.getAbortedSize()); + assertEquals(0, response.getNosuchSize()); + assertNotNull(firstHeartbeat); + assertNotNull(updatedHeartbeat); + + assertNotEquals(firstHeartbeat, updatedHeartbeat); + } + + @Test + public void testHeartbeatTxnRangeOneCommitted_Mixed() throws Exception { + openTxn(); + txnHandler.commitTxn(new CommitTxnRequest(1)); + openTxn(); + long txnId = openTxn(); + + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, txnId)); + + assertEquals(1, rsp.getNosuchSize()); + long noSuchTxnId = rsp.getNosuchIterator().next(); + assertEquals(1L, noSuchTxnId); + + assertEquals(0, rsp.getAbortedSize()); + } + + private long openTxn() throws MetaException { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); + return txns.get(0); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 8de9cbe93e10..c0280e62fe3f 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -38,8 +38,6 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockRequest; @@ -1177,48 +1175,6 @@ public void testHeartbeatLock() throws Exception { } } - @Test - public void heartbeatTxnRange() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(0, rsp.getAborted().size()); - assertEquals(0, rsp.getNosuch().size()); - } - - @Test - public void heartbeatTxnRangeOneCommitted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnHandler.commitTxn(new CommitTxnRequest(1)); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getNosuchSize()); - Long txn = rsp.getNosuch().iterator().next(); - assertEquals(1L, (long)txn); - assertEquals(0, rsp.getAborted().size()); - } - - @Test - public void heartbeatTxnRangeOneAborted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(3)); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getAbortedSize()); - Long txn = rsp.getAborted().iterator().next(); - assertEquals(3L, (long)txn); - assertEquals(0, rsp.getNosuch().size()); - } - @Test public void testLockTimeout() throws Exception { long timeout = txnHandler.setTimeout(1); @@ -1828,6 +1784,15 @@ public void testGetMaterializationInvalidationInfoWithValidReaderWriteIdListWhen ); } + @Test + public void testHeartbeatLockMaterializationRebuild() throws MetaException { + txnHandler.lockMaterializationRebuild("default", "table1", 1L); + + boolean result = txnHandler.heartbeatLockMaterializationRebuild("default", "table1", 1L); + + assertTrue(result); + } + private void testGetMaterializationInvalidationInfoWithValidReaderWriteIdList( ValidReadTxnList currentValidTxnList, ValidReaderWriteIdList... tableWriteIdList) throws MetaException { ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(5L); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 92437bdbba03..f92833645462 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -130,6 +130,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; +import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** @@ -704,16 +705,16 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, @Override public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws MetaException { int result = jdbcResource.execute( - "UPDATE \"MATERIALIZATION_REBUILD_LOCKS\"" + - " SET \"MRL_LAST_HEARTBEAT\" = " + Instant.now().toEpochMilli() + - " WHERE \"MRL_TXN_ID\" = " + txnId + - " AND \"MRL_DB_NAME\" = ?" + - " AND \"MRL_TBL_NAME\" = ?", - new MapSqlParameterSource() - .addValue("now", Instant.now().toEpochMilli()) - .addValue("txnId", txnId) - .addValue("dbName", dbName) - .addValue("tableNane", tableName), + "UPDATE \"MATERIALIZATION_REBUILD_LOCKS\"" + + " SET \"MRL_LAST_HEARTBEAT\" = :lastHeartbeat" + + " WHERE \"MRL_TXN_ID\" = :txnId" + + " AND \"MRL_DB_NAME\" = :dbName" + + " AND \"MRL_TBL_NAME\" = :tblName", + new MapSqlParameterSource() + .addValue("lastHeartbeat", Instant.now().toEpochMilli()) + .addValue("txnId", txnId) + .addValue("dbName", dbName) + .addValue("tblName", tableName), ParameterizedCommand.AT_LEAST_ONE_ROW); return result >= 1; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/HeartbeatTxnRangeFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/HeartbeatTxnRangeFunction.java index ff3c801b643c..69590d8c015f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/HeartbeatTxnRangeFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/HeartbeatTxnRangeFunction.java @@ -82,7 +82,7 @@ public HeartbeatTxnRangeResponse execute(MultiDataSourceJdbcResource jdbcResourc } if (updateCnt == numTxnsToHeartbeat) { //fast pass worked, i.e. all txns we were asked to heartbeat were Open as expected - context.rollbackToSavepoint(savePoint); + context.createSavepoint(); return rsp; } //if here, do the slow path so that we can return info txns which were not in expected state