Skip to content

Commit

Permalink
HIVE-28460: Determine the database type once the PersistenceManagerFa…
Browse files Browse the repository at this point in the history
…ctory created
  • Loading branch information
dengzhhu653 committed Sep 27, 2024
1 parent 2b6aa63 commit 80893b9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.metastore;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransactionRollbackException;
Expand All @@ -44,6 +45,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import javax.sql.DataSource;

/** Database product inferred via JDBC. Encapsulates all SQL logic associated with
* the database product.
* This class is a singleton, which is instantiated the first time
Expand All @@ -70,6 +73,8 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
private static DatabaseProduct theDatabaseProduct;

Configuration myConf;

private String productName;
/**
* Protected constructor for singleton class
*/
Expand All @@ -83,6 +88,16 @@ protected DatabaseProduct() {}
public static final String ORACLE_NAME = "oracle";
public static final String UNDEFINED_NAME = "other";

public static DatabaseProduct determineDatabaseProduct(DataSource connPool,
Configuration conf) {
try (Connection conn = connPool.getConnection()) {
String s = conn.getMetaData().getDatabaseProductName();
return determineDatabaseProduct(s, conf);
} catch (SQLException e) {
throw new RuntimeException("Unable to get database product name", e);
}
}

/**
* Determine the database product type
* @param productName string to defer database connection
Expand Down Expand Up @@ -144,6 +159,7 @@ public static DatabaseProduct determineDatabaseProduct(String productName,
}

theDatabaseProduct.dbType = dbt;
theDatabaseProduct.productName = productName;
}
}
return theDatabaseProduct;
Expand Down Expand Up @@ -812,4 +828,8 @@ public void unlockInternal() {
derbyLock.unlock();
}
}

public String getProductName() {
return productName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,7 @@ public MetaStoreDirectSql(PersistenceManager pm, Configuration conf, String sche
this.pm = pm;
this.conf = conf;
this.schema = schema;
DatabaseProduct dbType = null;

dbType = DatabaseProduct.determineDatabaseProduct(getProductName(pm), conf);

this.dbType = dbType;
this.dbType = PersistenceManagerProvider.getDatabaseProduct();
int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE);
this.directSqlInsertPart = new DirectSqlInsertPart(pm, dbType, batchSize);
if (batchSize == DETECT_BATCHING) {
Expand Down Expand Up @@ -272,23 +268,6 @@ private static String getFullyQualifiedName(String schema, String tblName) {
+ "\"" + tblName + "\"";
}


public MetaStoreDirectSql(PersistenceManager pm, Configuration conf) {
this(pm, conf, "");
}

static String getProductName(PersistenceManager pm) {
JDOConnection jdoConn = pm.getDataStoreConnection();
try {
return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
} catch (Throwable t) {
LOG.warn("Error retrieving product name", t);
return null;
} finally {
jdoConn.close(); // We must release the connection before we call other pm methods.
}
}

private boolean ensureDbInit() {
Transaction tx = pm.currentTransaction();
boolean doCommit = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,10 @@ private void initialize() {
LOG.info("RawStore: {}, with PersistenceManager: {}" +
" created in the thread with id: {}", this, pm, Thread.currentThread().getId());

String productName = MetaStoreDirectSql.getProductName(pm);
sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName, conf), conf);

isInitialized = pm != null;
if (isInitialized) {
dbType = determineDatabaseProduct();
dbType = PersistenceManagerProvider.getDatabaseProduct();
sqlGenerator = new SQLGenerator(dbType, conf);
expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
Expand All @@ -460,22 +458,6 @@ public PropertyStore getPropertyStore() {
return propertyStore;
}

private DatabaseProduct determineDatabaseProduct() {
return DatabaseProduct.determineDatabaseProduct(getProductName(pm), conf);
}

private static String getProductName(PersistenceManager pm) {
JDOConnection jdoConn = pm.getDataStoreConnection();
try {
return ((Connection)jdoConn.getNativeConnection()).getMetaData().getDatabaseProductName();
} catch (Throwable t) {
LOG.warn("Error retrieving product name", t);
return null;
} finally {
jdoConn.close(); // We must release the connection before we call other pm methods.
}
}

/**
* Configure SSL encryption to the database store.
*
Expand Down Expand Up @@ -4449,7 +4431,6 @@ public GetHelper(String catalogName, String dbName, String tblName,
boolean isConfigEnabled = MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)
&& (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL_DDL) || !isInTxn);
if (isConfigEnabled && directSql == null) {
dbType = determineDatabaseProduct();
directSql = new MetaStoreDirectSql(pm, getConf(), "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
public class PersistenceManagerProvider {
private static PersistenceManagerFactory pmf;
private static PersistenceManagerFactory compactorPmf;
private static DatabaseProduct databaseProduct;
private static Properties prop;
private static final ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock();
private static final Lock pmfReadLock = pmfLock.readLock();
Expand Down Expand Up @@ -271,6 +272,10 @@ public static void updatePmfProperties(Configuration conf) {
if (compactorPmf == null && useCompactorPool) {
compactorPmf = retry(() -> initPMF(conf, true));
}
if (databaseProduct == null) {
String url = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY);
databaseProduct = DatabaseProduct.determineDatabaseProduct(url, conf);
}
}
// downgrade by acquiring read lock before releasing write lock
pmfReadLock.lock();
Expand All @@ -288,7 +293,7 @@ public static void updatePmfProperties(Configuration conf) {

private static PersistenceManagerFactory initPMF(Configuration conf, boolean forCompactor) {
DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
PersistenceManagerFactory pmf;
PersistenceManagerFactory pmf = null;

// Any preexisting datanucleus property should be passed along
Map<Object, Object> dsProp = new HashMap<>(prop);
Expand All @@ -302,23 +307,33 @@ private static PersistenceManagerFactory initPMF(Configuration conf, boolean for
pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
} else {
String sourceName = forCompactor ? "objectstore-compactor" : "objectstore";
DataSource ds = null, ds2 = null;
try (DataSourceProvider.DataSourceNameConfigurator configurator =
new DataSourceProvider.DataSourceNameConfigurator(conf, sourceName)) {
DataSource ds = (maxPoolSize > 0) ? dsp.create(conf, maxPoolSize) : dsp.create(conf);
ds = (maxPoolSize > 0) ? dsp.create(conf, maxPoolSize) : dsp.create(conf);
// The secondary connection factory is used for schema generation, and for value generation operations.
// We should use a different pool for the secondary connection factory to avoid resource starvation.
// Since DataNucleus uses locks for schema generation and value generation, 2 connections should be sufficient.
configurator.resetName("objectstore-secondary");
DataSource ds2 = forCompactor ? ds : dsp.create(conf, /* maxPoolSize */ 2);
ds2 = forCompactor ? ds : dsp.create(conf, /* maxPoolSize */ 2);
dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds2);
databaseProduct = DatabaseProduct.determineDatabaseProduct(ds, conf);
dsProp.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
"org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
} catch (SQLException e) {
LOG.warn("Could not create PersistenceManagerFactory using "
+ "connection pool properties, will fall back", e);
pmf = JDOHelper.getPersistenceManagerFactory(prop);
} finally {
if (pmf == null && ds instanceof AutoCloseable) {
try (AutoCloseable close1 = (AutoCloseable) ds;
AutoCloseable close2 = (AutoCloseable) ds2 ) {
} catch (Exception e) {
LOG.warn("Failed to close the DataSource", e);
}
}
}
}
DataStoreCache dsc = pmf.getDataStoreCache();
Expand Down Expand Up @@ -499,6 +514,14 @@ public static PersistenceManager getPersistenceManager(boolean forCompactor) {
}
}

public static DatabaseProduct getDatabaseProduct() {
if (databaseProduct == null) {
throw new RuntimeException(
"Cannot determine the database product. PersistenceManagerFactory has not initialized yet");
}
return databaseProduct;
}

/**
* Properties specified in hive-default.xml override the properties specified
* in jpox.properties.
Expand Down Expand Up @@ -625,6 +648,7 @@ private static <T> T retry(Supplier<T> s) {
LOG.warn("Exception retry limit reached, not retrying any longer.", e);
} else {
LOG.debug("Non-retriable exception.", e);
break;
}
ex = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,10 @@ public void setConf(Configuration conf) {
}
}
if (dbProduct == null) {
try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPool)) {
determineDatabaseProduct(dbConn);
} catch (SQLException e) {
LOG.error("Unable to determine database product", e);
throw new RuntimeException(e);
dbProduct = DatabaseProduct.determineDatabaseProduct(connPool, conf);
if (dbProduct.isUNDEFINED()) {
String msg = "Unrecognized database product name <" + dbProduct.getProductName() + ">";
throw new IllegalStateException(msg);
}
}
if (sqlGenerator == null) {
Expand Down Expand Up @@ -1065,22 +1064,6 @@ protected Timestamp getDbTime() throws MetaException {
(ResultSet rs, int rowNum) -> rs.getTimestamp(1));
}

private void determineDatabaseProduct(Connection conn) {
try {
String s = conn.getMetaData().getDatabaseProductName();
dbProduct = DatabaseProduct.determineDatabaseProduct(s, conf);
if (dbProduct.isUNDEFINED()) {
String msg = "Unrecognized database product name <" + s + ">";
LOG.error(msg);
throw new IllegalStateException(msg);
}
} catch (SQLException e) {
String msg = "Unable to get database product name";
LOG.error(msg, e);
throw new IllegalStateException(msg, e);
}
}

private void initJdbcResource() {
if (jdbcResource == null) {
jdbcResource = new MultiDataSourceJdbcResource(dbProduct, conf, sqlGenerator);
Expand Down

0 comments on commit 80893b9

Please sign in to comment.