Skip to content

Commit 55fb56b

Browse files
tiezhuFlechazoW
authored andcommitted
Cherry-pick commits from DTStack.
1 parent 074d3da commit 55fb56b

File tree

44 files changed

+1343
-516
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1343
-516
lines changed

flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/table/CassandraDynamicTableFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@
6464
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_FETCH_SIZE;
6565
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_MAX_RETRIES;
6666
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_PARALLELISM;
67-
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL;
68-
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
69-
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_MAX_RETRIES;
7067
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_FETCH_SIZE;
7168
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN;
7269
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN_TYPE;
@@ -77,6 +74,9 @@
7774
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNNAME;
7875
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNTYPE;
7976
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_START_LOCATION;
77+
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL;
78+
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
79+
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_MAX_RETRIES;
8080

8181
/**
8282
* @author tiezhu

flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/sink/HBaseOutputFormat.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.dtstack.flinkx.connector.hbase14.converter.DataSyncSinkConverter;
2323
import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils;
2424
import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper;
25-
import com.dtstack.flinkx.security.KerberosUtil;
2625
import com.dtstack.flinkx.sink.format.BaseRichOutputFormat;
2726
import com.dtstack.flinkx.throwable.WriteRecordException;
2827

@@ -47,8 +46,6 @@
4746
import java.util.Map;
4847
import java.util.Objects;
4948

50-
import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
51-
5249
/**
5350
* The Hbase Implementation of OutputFormat
5451
*
@@ -111,18 +108,7 @@ protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordEx
111108
public void openInternal(int taskNumber, int numTasks) throws IOException {
112109
boolean openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
113110
if (openKerberos) {
114-
// TDH环境并且zk开启了kerberos需要设置zk的环境变量
115-
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
116-
String keytabFile =
117-
HBaseHelper.getKeyTabFileName(
118-
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
119-
String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile);
120-
String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client");
121-
KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal);
122-
}
123-
UserGroupInformation ugi =
124-
HBaseHelper.getUgi(
125-
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
111+
UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig);
126112
ugi.doAs(
127113
(PrivilegedAction<Object>)
128114
() -> {

flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormat.java

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,8 @@
2020

2121
import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils;
2222
import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper;
23-
import com.dtstack.flinkx.security.KerberosUtil;
2423
import com.dtstack.flinkx.source.format.BaseRichInputFormat;
25-
import com.dtstack.flinkx.util.PluginUtil;
2624

27-
import org.apache.flink.api.common.cache.DistributedCache;
2825
import org.apache.flink.core.io.InputSplit;
2926
import org.apache.flink.table.data.GenericRowData;
3027
import org.apache.flink.table.data.RowData;
@@ -55,8 +52,6 @@
5552
import java.util.Locale;
5653
import java.util.Map;
5754

58-
import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
59-
6055
/**
6156
* The InputFormat Implementation used for HbaseReader
6257
*
@@ -96,31 +91,17 @@ public void openInputFormat() throws IOException {
9691

9792
LOG.info("HbaseOutputFormat openInputFormat start");
9893
nameMaps = Maps.newConcurrentMap();
99-
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
100-
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
101-
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
102-
}
103-
connection =
104-
HBaseHelper.getHbaseConnection(
105-
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
94+
95+
connection = HBaseHelper.getHbaseConnection(hbaseConfig);
10696

10797
LOG.info("HbaseOutputFormat openInputFormat end");
10898
}
10999

110100
@Override
111101
public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOException {
112-
DistributedCache distributedCache =
113-
PluginUtil.createDistributedCacheFromContextClassLoader();
114-
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
115-
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
116-
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
117-
}
118-
try (Connection connection =
119-
HBaseHelper.getHbaseConnection(hbaseConfig, distributedCache, jobId)) {
102+
try (Connection connection = HBaseHelper.getHbaseConnection(hbaseConfig)) {
120103
if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) {
121-
UserGroupInformation ugi =
122-
HBaseHelper.getUgi(
123-
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
104+
UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig);
124105
return ugi.doAs(
125106
(PrivilegedAction<HBaseInputSplit[]>)
126107
() ->
@@ -261,16 +242,10 @@ public void openInternal(InputSplit inputSplit) throws IOException {
261242
byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey());
262243

263244
if (null == connection || connection.isClosed()) {
264-
connection =
265-
HBaseHelper.getHbaseConnection(
266-
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
245+
connection = HBaseHelper.getHbaseConnection(hbaseConfig);
267246
}
268247

269248
openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
270-
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
271-
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
272-
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
273-
}
274249

275250
table = connection.getTable(TableName.valueOf(tableName));
276251
scan = new Scan();
@@ -405,12 +380,4 @@ public Object convertBytesToAssignType(String columnType, byte[] byteArray, Stri
405380
}
406381
return column;
407382
}
408-
409-
// 设置zk的jaas配置
410-
private void setZkJaasConfiguration(DistributedCache distributedCache) {
411-
String keytabFile = HBaseHelper.getKeyTabFileName(hbaseConfig, distributedCache, jobId);
412-
String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile);
413-
String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client");
414-
KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal);
415-
}
416383
}

flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseHelper.java

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
package com.dtstack.flinkx.connector.hbase14.util;
2020

2121
import com.dtstack.flinkx.security.KerberosUtil;
22-
import com.dtstack.flinkx.util.FileSystemUtil;
23-
24-
import org.apache.flink.api.common.cache.DistributedCache;
2522

2623
import org.apache.commons.collections.MapUtils;
2724
import org.apache.commons.lang3.StringUtils;
@@ -44,6 +41,7 @@
4441
import java.security.PrivilegedAction;
4542
import java.util.Map;
4643

44+
import static com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF;
4745
import static com.dtstack.flinkx.security.KerberosUtil.KRB_STR;
4846

4947
/**
@@ -60,12 +58,11 @@ public class HBaseHelper {
6058
private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization";
6159
private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";
6260

63-
public static Connection getHbaseConnection(
64-
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
61+
public static Connection getHbaseConnection(Map<String, Object> hbaseConfigMap) {
6562
Validate.isTrue(MapUtils.isNotEmpty(hbaseConfigMap), "hbaseConfig不能为空Map结构!");
6663

6764
if (HBaseConfigUtils.isEnableKerberos(hbaseConfigMap)) {
68-
return getConnectionWithKerberos(hbaseConfigMap, distributedCache, jobId);
65+
return getConnectionWithKerberos(hbaseConfigMap);
6966
}
7067

7168
try {
@@ -77,11 +74,10 @@ public static Connection getHbaseConnection(
7774
}
7875
}
7976

80-
private static Connection getConnectionWithKerberos(
81-
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
77+
private static Connection getConnectionWithKerberos(Map<String, Object> hbaseConfigMap) {
8278
try {
8379
setKerberosConf(hbaseConfigMap);
84-
UserGroupInformation ugi = getUgi(hbaseConfigMap, distributedCache, jobId);
80+
UserGroupInformation ugi = getUgi(hbaseConfigMap);
8581
return ugi.doAs(
8682
(PrivilegedAction<Connection>)
8783
() -> {
@@ -98,19 +94,17 @@ private static Connection getConnectionWithKerberos(
9894
}
9995
}
10096

101-
public static UserGroupInformation getUgi(
102-
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId)
97+
public static UserGroupInformation getUgi(Map<String, Object> hbaseConfigMap)
10398
throws IOException {
10499
String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap);
105100

106-
keytabFileName =
107-
KerberosUtil.loadFile(hbaseConfigMap, keytabFileName, distributedCache, jobId);
101+
keytabFileName = KerberosUtil.loadFile(hbaseConfigMap, keytabFileName);
108102
String principal = KerberosUtil.getPrincipal(hbaseConfigMap, keytabFileName);
109-
KerberosUtil.loadKrb5Conf(hbaseConfigMap, distributedCache, jobId);
103+
KerberosUtil.loadKrb5Conf(hbaseConfigMap);
110104
KerberosUtil.refreshConfig();
111-
Configuration conf = FileSystemUtil.getConfiguration(hbaseConfigMap, null);
112105

113-
return KerberosUtil.loginAndReturnUgi(conf, principal, keytabFileName);
106+
return KerberosUtil.loginAndReturnUgi(
107+
principal, keytabFileName, System.getProperty(KEY_JAVA_SECURITY_KRB5_CONF));
114108
}
115109

116110
public static Configuration getConfig(Map<String, Object> hbaseConfigMap) {
@@ -135,33 +129,6 @@ public static void setKerberosConf(Map<String, Object> hbaseConfigMap) {
135129
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true);
136130
}
137131

138-
/**
139-
* 获取hbase关联的zk是否也开启了kerberos
140-
*
141-
* @param hbaseConfigMap
142-
* @return
143-
*/
144-
public static boolean openKerberosForZk(Map<String, Object> hbaseConfigMap) {
145-
String openKerberos =
146-
MapUtils.getString(hbaseConfigMap, "zookeeper.security.authentication", "default");
147-
return "kerberos".equalsIgnoreCase(openKerberos);
148-
}
149-
150-
/**
151-
* 获取keyTab文件的本地路径
152-
*
153-
* @param hbaseConfigMap
154-
* @param distributedCache
155-
* @param jobId
156-
* @return
157-
*/
158-
public static String getKeyTabFileName(
159-
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
160-
String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap);
161-
return KerberosUtil.getLocalFileName(
162-
hbaseConfigMap, keytabFileName, distributedCache, jobId);
163-
}
164-
165132
public static RegionLocator getRegionLocator(Connection hConnection, String userTable) {
166133
TableName hTableName = TableName.valueOf(userTable);
167134
Admin admin = null;

flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/BaseHdfsOutputFormat.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,7 @@ protected void openSource() {
145145
try {
146146
fs =
147147
FileSystemUtil.getFileSystem(
148-
hdfsConf.getHadoopConfig(),
149-
hdfsConf.getDefaultFS(),
150-
distributedCache,
151-
jobId);
148+
hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache);
152149
} catch (Exception e) {
153150
throw new FlinkxRuntimeException("can't init fileSystem", e);
154151
}
@@ -184,12 +181,10 @@ protected List<String> copyTmpDataFileToDir() {
184181
try {
185182
FileStatus[] dataFiles = fs.listStatus(tmpDir, pathFilter);
186183
for (FileStatus dataFile : dataFiles) {
187-
if (!filterFile(dataFile)) {
188-
currentFilePath = dataFile.getPath().getName();
189-
FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf);
190-
copyList.add(currentFilePath);
191-
LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir);
192-
}
184+
currentFilePath = dataFile.getPath().getName();
185+
FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf);
186+
copyList.add(currentFilePath);
187+
LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir);
193188
}
194189
} catch (Exception e) {
195190
throw new FlinkxRuntimeException(
@@ -229,11 +224,9 @@ protected void moveAllTmpDataFileToDir() {
229224

230225
FileStatus[] dataFiles = fs.listStatus(tmpDir);
231226
for (FileStatus dataFile : dataFiles) {
232-
if (!filterFile(dataFile)) {
233-
currentFilePath = dataFile.getPath().getName();
234-
fs.rename(dataFile.getPath(), dir);
235-
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
236-
}
227+
currentFilePath = dataFile.getPath().getName();
228+
fs.rename(dataFile.getPath(), dir);
229+
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
237230
}
238231
fs.delete(tmpDir, true);
239232
} catch (IOException e) {
@@ -294,16 +287,4 @@ public HdfsConf getHdfsConf() {
294287
public void setHdfsConf(HdfsConf hdfsConf) {
295288
this.hdfsConf = hdfsConf;
296289
}
297-
298-
/** filter file when move file to dataPath* */
299-
protected boolean filterFile(FileStatus fileStatus) {
300-
if (fileStatus.getLen() == 0) {
301-
LOG.warn(
302-
"file {} has filter,because file len [{}] is 0 ",
303-
fileStatus.getPath(),
304-
fileStatus.getLen());
305-
return true;
306-
}
307-
return false;
308-
}
309290
}

flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/sink/HdfsParquetOutputFormat.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import org.apache.flink.table.data.RowData;
3535

36-
import org.apache.hadoop.fs.FileStatus;
3736
import org.apache.hadoop.fs.Path;
3837
import org.apache.hadoop.security.UserGroupInformation;
3938
import org.apache.parquet.column.ParquetProperties;
@@ -51,7 +50,6 @@
5150

5251
import java.io.File;
5352
import java.io.IOException;
54-
import java.nio.charset.Charset;
5553
import java.security.PrivilegedAction;
5654
import java.util.HashMap;
5755
import java.util.List;
@@ -71,9 +69,6 @@ public class HdfsParquetOutputFormat extends BaseHdfsOutputFormat {
7169
private ParquetWriter<Group> writer;
7270
private MessageType schema;
7371

74-
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
75-
public static int PARQUET_FOOTER_LENGTH_SIZE = 4;
76-
7772
@Override
7873
protected void openSource() {
7974
super.openSource();
@@ -135,8 +130,7 @@ protected void nextBlock() {
135130
FileSystemUtil.getUGI(
136131
hdfsConf.getHadoopConfig(),
137132
hdfsConf.getDefaultFS(),
138-
getRuntimeContext().getDistributedCache(),
139-
jobId);
133+
getRuntimeContext().getDistributedCache());
140134
ugi.doAs(
141135
(PrivilegedAction<Object>)
142136
() -> {
@@ -297,18 +291,4 @@ private MessageType buildSchema() {
297291

298292
return typeBuilder.named("Pair");
299293
}
300-
301-
@Override
302-
protected boolean filterFile(FileStatus fileStatus) {
303-
if (fileStatus.getLen()
304-
< (long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length)) {
305-
LOG.warn(
306-
"file {} has filter,because file len [{}] less than [{}] ",
307-
fileStatus.getPath(),
308-
fileStatus.getLen(),
309-
(long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length));
310-
return true;
311-
}
312-
return false;
313-
}
314294
}

flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/BaseHdfsInputFormat.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOExcepti
7070
PluginUtil.createDistributedCacheFromContextClassLoader();
7171
UserGroupInformation ugi =
7272
FileSystemUtil.getUGI(
73-
hdfsConf.getHadoopConfig(),
74-
hdfsConf.getDefaultFS(),
75-
distributedCache,
76-
jobId);
73+
hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache);
7774
LOG.info("user:{}, ", ugi.getShortUserName());
7875
return ugi.doAs(
7976
(PrivilegedAction<InputSplit[]>)
@@ -101,8 +98,7 @@ public void openInputFormat() throws IOException {
10198
FileSystemUtil.getUGI(
10299
hdfsConf.getHadoopConfig(),
103100
hdfsConf.getDefaultFS(),
104-
getRuntimeContext().getDistributedCache(),
105-
jobId);
101+
getRuntimeContext().getDistributedCache());
106102
}
107103
}
108104

flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsParquetInputFormat.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public InputSplit[] createHdfsSplit(int minNumSplits) {
104104
FileSystemUtil.getFileSystem(
105105
hdfsConf.getHadoopConfig(),
106106
hdfsConf.getDefaultFS(),
107-
PluginUtil.createDistributedCacheFromContextClassLoader(),
108-
jobId)) {
107+
PluginUtil.createDistributedCacheFromContextClassLoader())) {
109108
allFilePaths = getAllPartitionPath(hdfsConf.getPath(), fs, pathFilter);
110109
} catch (Exception e) {
111110
throw new FlinkxRuntimeException(e);

0 commit comments

Comments
 (0)