Skip to content

Commit

Permalink
HIVE-28456: ObjectStore updatePartitionColumnStatisticsInBatch can ca…
Browse files Browse the repository at this point in the history
…use connection starvation (apache#5398) (Zhihua Deng, reviewed by Denys Kuzmenko)
  • Loading branch information
dengzhhu653 committed Sep 20, 2024
1 parent b1c7eb3 commit 538286c
Showing 1 changed file with 99 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.conf.Configuration;

import javax.jdo.PersistenceManager;
import javax.jdo.Transaction;
import javax.jdo.datastore.JDOConnection;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -101,14 +102,6 @@ public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
sqlGenerator = new SQLGenerator(dbType, conf);
}

void rollbackDBConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
} catch (SQLException e) {
LOG.warn("Failed to rollback db connection ", e);
}
}

void closeDbConn(JDOConnection jdoConn) {
try {
if (jdoConn != null) {
Expand Down Expand Up @@ -489,60 +482,69 @@ public Map<String, Map<String, String>> updatePartitionColumnStatistics(Map<Stri
String validWriteIds, long writeId,
List<TransactionalMetaStoreEventListener> transactionalListeners)
throws MetaException {
JDOConnection jdoConn = null;
Connection dbConn = null;
boolean committed = false;

Transaction tx = pm.currentTransaction();
boolean doCommit = false;
try {
dbType.lockInternal();
jdoConn = pm.getDataStoreConnection();
dbConn = (Connection) (jdoConn.getNativeConnection());

setAnsiQuotes(dbConn);
if (!tx.isActive()) {
tx.begin();
doCommit = true;
}
JDOConnection jdoConn = null;
Map<String, Map<String, String>> result;
try {
jdoConn = pm.getDataStoreConnection();
Connection dbConn = (Connection) jdoConn.getNativeConnection();
setAnsiQuotes(dbConn);

Map<PartitionInfo, ColumnStatistics> partitionInfoMap = getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);
Map<PartitionInfo, ColumnStatistics> partitionInfoMap = getPartitionInfo(dbConn, tbl.getId(), partColStatsMap);

Map<String, Map<String, String>> result =
updatePartitionParamTable(dbConn, partitionInfoMap, validWriteIds, writeId, TxnUtils.isAcidTable(tbl));
result = updatePartitionParamTable(dbConn, partitionInfoMap, validWriteIds,
writeId, TxnUtils.isAcidTable(tbl));

Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new HashMap<>();
Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new HashMap<>();
populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn, tbl);
Map<PartColNameInfo, MPartitionColumnStatistics> insertMap = new HashMap<>();
Map<PartColNameInfo, MPartitionColumnStatistics> updateMap = new HashMap<>();
populateInsertUpdateMap(partitionInfoMap, updateMap, insertMap, dbConn, tbl);

LOG.info("Number of stats to insert " + insertMap.size() + " update " + updateMap.size());
LOG.info("Number of stats to insert " + insertMap.size() + " update " + updateMap.size());

if (insertMap.size() != 0) {
insertIntoPartColStatTable(insertMap, csId, dbConn);
}
if (insertMap.size() != 0) {
insertIntoPartColStatTable(insertMap, csId, dbConn);
}

if (updateMap.size() != 0) {
updatePartColStatTable(updateMap, dbConn);
}
if (updateMap.size() != 0) {
updatePartColStatTable(updateMap, dbConn);
}

if (transactionalListeners != null) {
UpdatePartitionColumnStatEventBatch eventBatch = new UpdatePartitionColumnStatEventBatch(null);
for (Map.Entry entry : result.entrySet()) {
Map<String, String> parameters = (Map<String, String>) entry.getValue();
ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
List<String> partVals = getPartValsFromName(tbl, colStats.getStatsDesc().getPartName());
UpdatePartitionColumnStatEvent event = new UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
tbl, writeId, null);
eventBatch.addPartColStatEvent(event);
if (transactionalListeners != null) {
UpdatePartitionColumnStatEventBatch eventBatch = new UpdatePartitionColumnStatEventBatch(null);
for (Map.Entry entry : result.entrySet()) {
Map<String, String> parameters = (Map<String, String>) entry.getValue();
ColumnStatistics colStats = partColStatsMap.get(entry.getKey());
List<String> partVals = getPartValsFromName(tbl, colStats.getStatsDesc().getPartName());
UpdatePartitionColumnStatEvent event = new UpdatePartitionColumnStatEvent(colStats, partVals, parameters,
tbl, writeId, null);
eventBatch.addPartColStatEvent(event);
}
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH, eventBatch, dbConn, sqlGenerator);
}
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT_BATCH, eventBatch, dbConn, sqlGenerator);
} finally {
closeDbConn(jdoConn);
}
if (doCommit) {
tx.commit();
}
dbConn.commit();
committed = true;
return result;
} catch (Exception e) {
LOG.error("Unable to update Column stats for " + tbl.getTableName(), e);
throw new MetaException("Unable to update Column stats for " + tbl.getTableName()
+ " due to: " + e.getMessage());
} finally {
if (!committed) {
rollbackDBConn(dbConn);
if (doCommit && tx.isActive()) {
tx.rollback();
}
closeDbConn(jdoConn);
dbType.unlockInternal();
}
}
Expand All @@ -553,72 +555,75 @@ public Map<String, Map<String, String>> updatePartitionColumnStatistics(Map<Stri
*/
public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws MetaException {
long maxCsId = 0;
boolean committed = false;
Connection dbConn = null;
JDOConnection jdoConn = null;

Transaction tx = pm.currentTransaction();
boolean doCommit = false;
try {
dbType.lockInternal();
jdoConn = pm.getDataStoreConnection();
dbConn = (Connection) (jdoConn.getNativeConnection());

setAnsiQuotes(dbConn);

// This loop will be iterated at max twice. If there is no records, it will first insert and then do a select.
// We are not using any upsert operations as select for update and then update is required to make sure that
// the caller gets a reserved range for CSId not used by any other thread.
boolean insertDone = false;
while (maxCsId == 0) {
String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\" FROM \"SEQUENCE_TABLE\" "
+ "WHERE \"SEQUENCE_NAME\"= "
+ quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
LOG.debug("Execute query: " + query);
try (Statement statement = dbConn.createStatement();
ResultSet rs = statement.executeQuery(query)) {
if (rs.next()) {
maxCsId = rs.getLong(1);
} else if (insertDone) {
throw new MetaException("Invalid state of SEQUENCE_TABLE for MPartitionColumnStatistics");
} else {
insertDone = true;
query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") VALUES ( "
+ quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1
+ ")";
try {
statement.executeUpdate(query);
} catch (SQLException e) {
// If the record is already inserted by some other thread continue to select.
if (dbType.isDuplicateKeyError(e)) {
continue;
if (!tx.isActive()) {
tx.begin();
doCommit = true;
}
JDOConnection jdoConn = null;
try {
jdoConn = pm.getDataStoreConnection();
Connection dbConn = (Connection) jdoConn.getNativeConnection();

setAnsiQuotes(dbConn);

// This loop will be iterated at max twice. If there is no records, it will first insert and then do a select.
// We are not using any upsert operations as select for update and then update is required to make sure that
// the caller gets a reserved range for CSId not used by any other thread.
boolean insertDone = false;
while (maxCsId == 0) {
String query = sqlGenerator.addForUpdateClause(
"SELECT \"NEXT_VAL\" FROM \"SEQUENCE_TABLE\" " + "WHERE \"SEQUENCE_NAME\"= " + quoteString(
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
LOG.debug("Execute query: " + query);
try (Statement statement = dbConn.createStatement(); ResultSet rs = statement.executeQuery(query)) {
if (rs.next()) {
maxCsId = rs.getLong(1);
} else if (insertDone) {
throw new MetaException("Invalid state of SEQUENCE_TABLE for MPartitionColumnStatistics");
} else {
insertDone = true;
query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") VALUES ( " + quoteString(
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1 + ")";
try {
statement.executeUpdate(query);
} catch (SQLException e) {
// If the record is already inserted by some other thread continue to select.
if (dbType.isDuplicateKeyError(e)) {
continue;
}
LOG.error("Unable to insert into SEQUENCE_TABLE for MPartitionColumnStatistics.", e);
throw e;
}
LOG.error("Unable to insert into SEQUENCE_TABLE for MPartitionColumnStatistics.", e);
throw e;
}
}
}
}

long nextMaxCsId = maxCsId + numStats + 1;
String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
+ nextMaxCsId
+ " WHERE \"SEQUENCE_NAME\" = "
+ quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
long nextMaxCsId = maxCsId + numStats + 1;
String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = " + nextMaxCsId + " WHERE \"SEQUENCE_NAME\" = " + quoteString(
"org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");

try (Statement statement = dbConn.createStatement()) {
statement.executeUpdate(query);
try (Statement statement = dbConn.createStatement()) {
statement.executeUpdate(query);
}
} finally {
closeDbConn(jdoConn);
}
if (doCommit) {
tx.commit();
}
dbConn.commit();
committed = true;
return maxCsId;
} catch (Exception e) {
LOG.error("Unable to getNextCSIdForMPartitionColumnStatistics", e);
throw new MetaException("Unable to getNextCSIdForMPartitionColumnStatistics "
+ " due to: " + e.getMessage());
} finally {
if (!committed) {
rollbackDBConn(dbConn);
if (doCommit && tx.isActive()) {
tx.rollback();
}
closeDbConn(jdoConn);
dbType.unlockInternal();
}
}
Expand Down

0 comments on commit 538286c

Please sign in to comment.