Skip to content

Commit

Permalink
HIVE-27597: Implement data connector for Hive to Hive federation over… (
Browse files Browse the repository at this point in the history
#4720)

* HIVE-27597: Implement data connector for Hive to Hive federation over JDBC (Naveen Gangam)
  • Loading branch information
nrg4878 authored Oct 9, 2023
1 parent dec006e commit 076f0ac
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ protected Table buildTableFromColsList(String tableName, List<FieldSchema> cols)
abstract protected String getOutputClass();

abstract protected String getTableLocation(String tblName);

abstract protected String getDatasourceType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.DERBY_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.HIVE_JDBC_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MSSQL_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.MYSQL_TYPE;
import static org.apache.hadoop.hive.metastore.dataconnector.IDataConnectorProvider.ORACLE_TYPE;
Expand Down Expand Up @@ -98,6 +99,7 @@ public static synchronized IDataConnectorProvider getDataConnectorProvider(Datab
String type = connector.getType();
switch (type) {
case DERBY_TYPE:
case HIVE_JDBC_TYPE:
case MSSQL_TYPE:
case MYSQL_TYPE:
case ORACLE_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public interface IDataConnectorProvider {
public static final String ORACLE_TYPE = "oracle";
public static final String MSSQL_TYPE = "mssql";
public static final String DERBY_TYPE = "derby";
public static final String HIVE_JDBC_TYPE = "hivejdbc";

DataConnector connector = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.hive.metastore.api.DataConnector;
import org.apache.hadoop.hive.metastore.dataconnector.jdbc.DerbySQLConnectorProvider;
import org.apache.hadoop.hive.metastore.dataconnector.jdbc.HiveJDBCConnectorProvider;
import org.apache.hadoop.hive.metastore.dataconnector.jdbc.MySQLConnectorProvider;
import org.apache.hadoop.hive.metastore.dataconnector.jdbc.PostgreSQLConnectorProvider;
import org.apache.hadoop.hive.metastore.dataconnector.jdbc.OracleConnectorProvider;
Expand Down Expand Up @@ -51,6 +52,10 @@ public static IDataConnectorProvider get(String dbName, DataConnector connector)
provider = new MSSQLConnectorProvider(dbName, connector);
break;

case HIVE_JDBC_TYPE:
provider = new HiveJDBCConnectorProvider(dbName, connector);
break;

default:
throw new RuntimeException("Unsupported JDBC type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public abstract class AbstractJDBCConnectorProvider extends AbstractDataConnecto
private static final String JDBC_INPUTFORMAT_CLASS = "org.apache.hive.storage.jdbc.JdbcInputFormat".intern();
private static final String JDBC_OUTPUTFORMAT_CLASS = "org.apache.hive.storage.jdbc.JdbcOutputFormat".intern();

String type = null; // MYSQL, POSTGRES, ORACLE, DERBY, MSSQL, DB2 etc.
String type = null; // MYSQL, POSTGRES, ORACLE, DERBY, MSSQL, DB2, HIVEJDBC etc.
String jdbcUrl = null;
String username = null;
String password = null; // TODO convert to byte array
Expand Down Expand Up @@ -111,7 +111,7 @@ public AbstractJDBCConnectorProvider(String dbName, DataConnector dataConn, Stri
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException cnfe) {
LOG.warn("Driver class not found in classpath:" + driverClassName);
LOG.warn("Driver class not found in classpath: {}" + driverClassName);
throw new RuntimeException("Driver class not found:" + driverClass.getClass().getName(), cnfe);
}
}
Expand All @@ -121,7 +121,7 @@ public AbstractJDBCConnectorProvider(String dbName, DataConnector dataConn, Stri
close();
handle = DriverManager.getDriver(jdbcUrl).connect(jdbcUrl, getConnectionProperties());
} catch (SQLException sqle) {
LOG.warn("Could not connect to remote data source at " + jdbcUrl);
LOG.warn("Could not connect to remote data source at {}", jdbcUrl);
throw new ConnectException("Could not connect to remote datasource at " + jdbcUrl + ",cause:" + sqle.getMessage());
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ protected boolean isClosed() {
if (handle instanceof Connection)
return ((Connection) handle).isClosed();
} catch (SQLException e) {
LOG.warn("Could not determine whether jdbc connection is closed or not to "+ jdbcUrl, e);
LOG.warn("Could not determine whether jdbc connection, to {}, is closed or not: {} ", jdbcUrl, e);
}
return true;
}
Expand All @@ -165,7 +165,7 @@ protected boolean isClosed() {
try {
((Connection)handle).close();
} catch (SQLException sqle) {
LOG.warn("Could not close jdbc connection to " + jdbcUrl, sqle);
LOG.warn("Could not close jdbc connection to {}: {}", jdbcUrl, sqle);
throw new RuntimeException(sqle);
}
}
Expand All @@ -191,7 +191,7 @@ protected boolean isClosed() {
return tables;
}
} catch (SQLException sqle) {
LOG.warn("Could not retrieve tables from remote datasource, cause:" + sqle.getMessage());
LOG.warn("Could not retrieve tables from remote datasource, cause: {}", sqle.getMessage());
throw new MetaException("Error retrieving remote table:" + sqle);
} finally {
try {
Expand Down Expand Up @@ -221,7 +221,7 @@ protected boolean isClosed() {
return tables;
}
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
LOG.warn("Could not retrieve table names from remote datasource, cause: {}", sqle.getMessage());
throw new MetaException("Error retrieving remote table:" + sqle);
} finally {
try {
Expand All @@ -234,9 +234,32 @@ protected boolean isClosed() {
return null;
}

protected abstract ResultSet fetchTableMetadata(String tableName) throws MetaException;
protected ResultSet fetchTableMetadata(String tableName) throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(getCatalogName(), getDatabaseName(), null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause: {}", sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
}
return rs;
}

protected abstract ResultSet fetchTableNames() throws MetaException;
/**
* Returns a list of all table names from the remote database.
* @return List A collection of all the table names, null if there are no tables.
* @throws MetaException To indicate any failures with executing this API
*/
protected ResultSet fetchTableNames() throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(getCatalogName(), getDatabaseName(), null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause: {}", sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle);
}
return rs;
}

protected abstract String getCatalogName();

Expand Down Expand Up @@ -270,7 +293,7 @@ protected boolean isClosed() {

table = buildTableFromColsList(tableName, cols);
//Setting the table properties.
table.getParameters().put(JDBC_DATABASE_TYPE, this.type);
table.getParameters().put(JDBC_DATABASE_TYPE, getDatasourceType());
table.getParameters().put(JDBC_DRIVER, this.driverClassName);
table.getParameters().put(JDBC_TABLE, tableName);
table.getParameters().put(JDBC_SCHEMA, scoped_db);
Expand All @@ -285,8 +308,8 @@ protected boolean isClosed() {
}
return table;
} catch (Exception e) {
LOG.warn("Exception retrieving remote table " + scoped_db + "." + tableName + " via data connector "
+ connector.getName());
LOG.warn("Exception retrieving remote table {}.{} via data connector {}", scoped_db, tableName
,connector.getName());
throw new MetaException("Error retrieving remote table:" + e);
} finally {
try {
Expand All @@ -302,7 +325,7 @@ private ResultSet fetchTablesViaDBMetaData(String regex) throws SQLException {
try {
rs = getConnection().getMetaData().getTables(getCatalogName(), getDatabaseName(), regex, new String[]{"TABLE"});
} catch (SQLException sqle) {
LOG.warn("Could not retrieve tables from JDBC table, cause:" + sqle.getMessage());
LOG.warn("Could not retrieve tables from JDBC table, cause: {}", sqle.getMessage());
throw sqle;
}
return rs;
Expand All @@ -313,7 +336,7 @@ private ResultSet fetchColumnsViaDBMetaData(String tableName) throws SQLExceptio
try {
rs = getConnection().getMetaData().getColumns(getCatalogName(), getDatabaseName(), tableName, null);
} catch (SQLException sqle) {
LOG.warn("Could not retrieve columns from JDBC table, cause:" + sqle.getMessage());
LOG.warn("Could not retrieve columns from JDBC table, cause: {}", sqle.getMessage());
throw sqle;
}
return rs;
Expand Down Expand Up @@ -402,7 +425,7 @@ protected String getDataType(String mySqlType, int size) {
try {
return warehouse.getDefaultTablePath(scoped_db, tableName, true).toString();
} catch (MetaException e) {
LOG.info("Error determining default table path, cause:" + e.getMessage());
LOG.info("Error determining default table path, cause: {}", e.getMessage());
}
}
return "some_dummy_path";
Expand All @@ -415,4 +438,7 @@ protected Properties getConnectionProperties() {
connectorPropMap.forEach((k, v) -> connectionProperties.put(k, v));
return connectionProperties;
}
}

@Override
protected String getDatasourceType() { return type; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,6 @@ public DerbySQLConnectorProvider(String dbName, DataConnector connector) {
super(dbName, connector, DRIVER_CLASS);
}

/**
* Returns a list of all table names from the remote database.
* @return List A collection of all the table names, null if there are no tables.
* @throws MetaException To indicate any failures with executing this API
*/
@Override
protected ResultSet fetchTableNames() throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(scoped_db, null, null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle);
}
return rs;
}

/**
* Fetch a single table with the given name, returns a Hive Table object from the remote database
* @return Table A Table object for the matching table, null otherwise.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.metastore.dataconnector.jdbc;

import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.api.DataConnector;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;

public class HiveJDBCConnectorProvider extends AbstractJDBCConnectorProvider {
private static final Logger LOG = LoggerFactory.getLogger(HiveJDBCConnectorProvider.class);
private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
// for Hive the type for connector is "HIVEJDBC" where as on the table we want it to be "HIVE"
protected static final String mappedType = "HIVE";

public HiveJDBCConnectorProvider(String dbName, DataConnector dataConn) {
super(dbName, dataConn, DRIVER_CLASS);
}

@Override protected String getCatalogName() {
return null;
}

@Override protected String getDatabaseName() {
return scoped_db;
}

@Override protected String getDataType(String dbDataType, int size) {
String mappedType = super.getDataType(dbDataType, size);
if (!mappedType.equalsIgnoreCase(ColumnType.VOID_TYPE_NAME)) {
return mappedType;
}

// map any db specific types here.
switch (dbDataType.trim().toLowerCase())
{
case "string":
case "varchar":
mappedType = ColumnType.STRING_TYPE_NAME;
break;
default:
// TODO Hive has support for complex data types but JDBCSerDe only supports primitive types
// SerDe needs to enhanced first to be able to support complex types over federation
mappedType = ColumnType.VOID_TYPE_NAME;
break;
}
return mappedType;
}

@Override protected String getDatasourceType() { return mappedType; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,6 @@ public MSSQLConnectorProvider(String dbName, DataConnector dataConn) {
driverClassName = DRIVER_CLASS;
}

@Override protected ResultSet fetchTableMetadata(String tableName) throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getColumns(null, scoped_db, tableName, null);
} catch (Exception ex) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + ex.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + ex);
}
return rs;
}

@Override protected ResultSet fetchTableNames() throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(null, scoped_db, null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle);
}
return rs;
}

@Override protected String getCatalogName() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,6 @@ public MySQLConnectorProvider(String dbName, DataConnector dataConn) {
super(dbName, dataConn, DRIVER_CLASS);
}

/**
* Returns a list of all table names from the remote database.
* @return List A collection of all the table names, null if there are no tables.
* @throws MetaException To indicate any failures with executing this API
*/
@Override protected ResultSet fetchTableNames() throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(scoped_db, null, null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle);
}
return rs;
}

/**
* Fetch a single table with the given name, returns a Hive Table object from the remote database
* @return Table A Table object for the matching table, null otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,6 @@ public OracleConnectorProvider(String dbName, DataConnector dataConn) {
driverClassName = DRIVER_CLASS;
}

@Override protected ResultSet fetchTableMetadata(String tableName) throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getColumns(null, scoped_db, tableName, null);
} catch (Exception ex) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + ex.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + ex);
}
return rs;
}

@Override protected ResultSet fetchTableNames() throws MetaException {
ResultSet rs = null;
try {
rs = getConnection().getMetaData().getTables(null, scoped_db, null, new String[] { "TABLE" });
} catch (SQLException sqle) {
LOG.warn("Could not retrieve table names from remote datasource, cause:" + sqle.getMessage());
throw new MetaException("Could not retrieve table names from remote datasource, cause:" + sqle);
}
return rs;
}

@Override protected String getCatalogName() {
return null;
}
Expand Down
Loading

0 comments on commit 076f0ac

Please sign in to comment.