Skip to content

Commit

Permalink
mongodb,hbase连接创建优化
Browse files Browse the repository at this point in the history
  • Loading branch information
weiye committed Mar 26, 2020
1 parent 4cb9e19 commit e031913
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.wugui.datax.admin.service;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;

/**
Expand All @@ -19,7 +18,7 @@ public interface DatasourceQueryService {
* @param id
* @return
*/
List<String> getDBs(Long id) throws UnknownHostException;
List<String> getDBs(Long id) throws IOException;

/**
* 根据数据源表id查询出可用的表
Expand All @@ -34,7 +33,7 @@ public interface DatasourceQueryService {
* @param dbName
* @return
*/
List<String> getCollectionNames(long id,String dbName) throws UnknownHostException;
List<String> getCollectionNames(long id,String dbName) throws IOException;

/**
* 根据数据源id,表名查询出该表所有字段
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;

/**
Expand All @@ -32,10 +31,10 @@ public class DatasourceQueryServiceImpl implements DatasourceQueryService {
private JobDatasourceService jobDatasourceService;

@Override
public List<String> getDBs(Long id) throws UnknownHostException {
public List<String> getDBs(Long id) throws IOException {
//获取数据源对象
JobDatasource datasource = jobDatasourceService.getById(id);
return MongoDBQueryTool.getInstance(datasource).getDBNames();
return new MongoDBQueryTool(datasource).getDBNames();
}


Expand All @@ -48,24 +47,24 @@ public List<String> getTables(Long id) throws IOException {
return Lists.newArrayList();
}
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
return HBaseQueryTool.getInstance(datasource).getTableNames();
return new HBaseQueryTool(datasource).getTableNames();
} else if(JdbcConstants.MONGODB.equals(datasource.getDatasource())){
return MongoDBQueryTool.getInstance(datasource).getCollectionNames(datasource.getDatabaseName());
return new MongoDBQueryTool(datasource).getCollectionNames(datasource.getDatabaseName());
} else {
BaseQueryTool qTool = QueryToolFactory.getByDbType(datasource);
return qTool.getTableNames();
}
}

@Override
public List<String> getCollectionNames(long id,String dbName) throws UnknownHostException {
public List<String> getCollectionNames(long id,String dbName) throws IOException {
//获取数据源对象
JobDatasource datasource = jobDatasourceService.getById(id);
//queryTool组装
if (ObjectUtil.isNull(datasource)) {
return Lists.newArrayList();
}
return MongoDBQueryTool.getInstance(datasource).getCollectionNames(dbName);
return new MongoDBQueryTool(datasource).getCollectionNames(dbName);
}


Expand All @@ -78,9 +77,9 @@ public List<String> getColumns(Long id, String tableName) throws IOException {
return Lists.newArrayList();
}
if (JdbcConstants.HBASE.equals(datasource.getDatasource())) {
return HBaseQueryTool.getInstance(datasource).getColumns(tableName);
return new HBaseQueryTool(datasource).getColumns(tableName);
} else if (JdbcConstants.MONGODB.equals(datasource.getDatasource())) {
return MongoDBQueryTool.getInstance(datasource).getColumns(tableName);
return new MongoDBQueryTool(datasource).getColumns(tableName);
} else {
BaseQueryTool queryTool = QueryToolFactory.getByDbType(datasource);
return queryTool.getColumnNames(tableName, datasource.getDatasource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class JobDatasourceServiceImpl extends ServiceImpl<JobDatasourceMapper, J
@Override
public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException {
if (JdbcConstants.HBASE.equals(jobDatasource.getDatasource())) {
return HBaseQueryTool.getInstance(jobDatasource).dataSourceTest();
return new HBaseQueryTool(jobDatasource).dataSourceTest();
}
String userName = AESUtil.decrypt(jobDatasource.getJdbcUsername());
// 判断账密是否为密文
Expand All @@ -41,7 +41,7 @@ public Boolean dataSourceTest(JobDatasource jobDatasource) throws IOException {
jobDatasource.setJdbcPassword(AESUtil.encrypt(jobDatasource.getJdbcPassword()));
}
if (JdbcConstants.MONGODB.equals(jobDatasource.getDatasource())) {
return MongoDBQueryTool.getInstance(jobDatasource).dataSourceTest(jobDatasource.getDatabaseName());
return new MongoDBQueryTool(jobDatasource).dataSourceTest(jobDatasource.getDatabaseName());
}
BaseQueryTool queryTool = QueryToolFactory.getByDbType(jobDatasource);
return queryTool.dataSourceTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.wugui.datatx.core.util.Constants;
import com.wugui.datax.admin.core.util.LocalCacheUtil;
import com.wugui.datax.admin.entity.JobDatasource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
Expand All @@ -17,107 +18,101 @@

public class HBaseQueryTool {

private static Configuration conf = HBaseConfiguration.create();
private static ExecutorService pool = Executors.newScheduledThreadPool(2);
private static Connection connection = null;
private static HBaseQueryTool instance = null;
private static Admin admin;
private static Table table;
private Configuration conf = HBaseConfiguration.create();
private ExecutorService pool = Executors.newScheduledThreadPool(2);
private Connection connection = null;
private Admin admin;
private Table table;

public HBaseQueryTool(JobDatasource jobDatasource) throws IOException {
if (LocalCacheUtil.get(jobDatasource.getDatasourceName()) == null) {
getDataSource(jobDatasource);
} else {
connection = (Connection) LocalCacheUtil.get(jobDatasource.getDatasourceName());
if (connection == null || connection.isClosed()) {
LocalCacheUtil.remove(jobDatasource.getDatasourceName());
getDataSource(jobDatasource);
}
}
LocalCacheUtil.set(jobDatasource.getDatasourceName(), connection, 4 * 60 * 60 * 1000);
}

HBaseQueryTool(JobDatasource jobDatasource) throws IOException {
if (connection == null|| connection.isClosed()){
String[] zkAdress=jobDatasource.getZkAdress().split(Constants.SPLIT_SCOLON);
conf.set("hbase.zookeeper.quorum",zkAdress[0]);
conf.set("hbase.zookeeper.property.clientPort", zkAdress[1]);
connection = ConnectionFactory.createConnection(conf,pool);
admin = connection.getAdmin();
private void getDataSource(JobDatasource jobDatasource) throws IOException {
String[] zkAdress = jobDatasource.getZkAdress().split(Constants.SPLIT_SCOLON);
conf.set("hbase.zookeeper.quorum", zkAdress[0]);
conf.set("hbase.zookeeper.property.clientPort", zkAdress[1]);
connection = ConnectionFactory.createConnection(conf, pool);
admin = connection.getAdmin();
}
}

/**
* 获得该类的实例,单例模式
*
* @return
*/
public static HBaseQueryTool getInstance(JobDatasource jobDatasource) throws IOException {
if (instance == null) {
synchronized(HBaseQueryTool.class){
if(instance == null){
instance = new HBaseQueryTool(jobDatasource);
// 关闭连接
public void sourceClose() {
try {
if (admin != null) {
admin.close();
}
if (null != connection) {
connection.close();
}
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
return instance;
}

// 关闭连接
public static void sourceClose() {
try {
if (admin != null) {
admin.close();
}
if (null != connection) {
connection.close();
}
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
/**
* 测试是否连接成功
*
* @return
* @throws IOException
*/
public boolean dataSourceTest() throws IOException {
Admin admin = connection.getAdmin();
HTableDescriptor[] tableDescriptor = admin.listTables();
return tableDescriptor.length > 0;
}
}
/**
* 测试是否连接成功
*
* @return
* @throws IOException
*/
public boolean dataSourceTest() throws IOException {
Admin admin =connection.getAdmin();
HTableDescriptor[] tableDescriptor = admin.listTables();
return tableDescriptor.length > 0;
}

/**
* 获取HBase表名称
* @return
* @throws IOException
*/
public List<String> getTableNames() throws IOException {
List<String> list = new ArrayList<>();
Admin admin = connection.getAdmin();
TableName[] names=admin.listTableNames();
for (int i = 0; i < names.length; i++) {
list.add(names[i].getNameAsString());
/**
* 获取HBase表名称
*
* @return
* @throws IOException
*/
public List<String> getTableNames() throws IOException {
List<String> list = new ArrayList<>();
Admin admin = connection.getAdmin();
TableName[] names = admin.listTableNames();
for (int i = 0; i < names.length; i++) {
list.add(names[i].getNameAsString());
}
return list;
}
return list;
}

/**
*
* 通过表名查询所有l列祖和列
*
* @param tableName
* @return
* @throws IOException
*/
public List<String> getColumns(String tableName) throws IOException {
List<String> list = new ArrayList<>();
table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
//Filter filter = new PageFilter(1);
//scan.setFilter(filter);
scan.getStartRow();
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> it = scanner.iterator();
if(it.hasNext()) {
Result re = it.next();
List<Cell> listCells = re.listCells();
for (Cell cell : listCells) {
list.add(new String(CellUtil.cloneFamily(cell))+":"+new String(CellUtil.cloneQualifier(cell)));
}
/**
* 通过表名查询所有l列祖和列
*
* @param tableName
* @return
* @throws IOException
*/
public List<String> getColumns(String tableName) throws IOException {
List<String> list = new ArrayList<>();
table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
//Filter filter = new PageFilter(1);
//scan.setFilter(filter);
scan.getStartRow();
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> it = scanner.iterator();
if (it.hasNext()) {
Result re = it.next();
List<Cell> listCells = re.listCells();
for (Cell cell : listCells) {
list.add(new String(CellUtil.cloneFamily(cell)) + ":" + new String(CellUtil.cloneQualifier(cell)));
}
}
return list;
}
return list;
}
}
Loading

0 comments on commit e031913

Please sign in to comment.