From 0f4573f2a87a2d0097e3a737e59fab1225b517c1 Mon Sep 17 00:00:00 2001
From: 77954309 <77954309@qq.com>
Date: Sun, 3 Jan 2021 11:49:05 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E7=94=A8es=20restclient=20=E5=85=BC?=
=?UTF-8?q?=E5=AE=B9=205,6,7=E7=89=88=E6=9C=AC?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../datax/datax-elasticsearchwriter/pom.xml | 12 +
.../v6/ElasticRestClient.java | 419 +++---------------
.../v6/ElasticRestHighClient.java | 393 ++++++++++++++++
.../v6/ElasticRestWriter.java | 265 +++++++++++
.../elasticsearchwriter/v6/ElasticWriter.java | 22 +-
.../v6/ElasticWriterErrorCode.java | 4 +-
.../v6/column/ElasticColumn.java | 69 +++
pom.xml | 10 +
8 files changed, 836 insertions(+), 358 deletions(-)
create mode 100644 modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestHighClient.java
create mode 100644 modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java
diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml b/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml
index a8c830f35..ae5d3ef44 100644
--- a/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml
+++ b/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml
@@ -76,6 +76,18 @@
com.alibaba
fastjson
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java
index 59dc6db80..484feebc0 100644
--- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java
+++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java
@@ -1,393 +1,120 @@
-/*
- *
- * Copyright 2020 WeBank
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6;
import com.alibaba.datax.common.exception.DataXException;
-import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
-import org.apache.http.HttpRequestInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContexts;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.Version;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.main.MainResponse;
-import org.elasticsearch.action.support.ActiveShardCount;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.*;
-import org.elasticsearch.client.indices.*;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
-import java.io.IOException;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.security.KeyManagementException;
+import java.net.URL;
import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.util.*;
-import java.util.function.BiConsumer;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
/**
- * @author davidhua
- * 2019/8/1
+ * @Classname ElasticRestClient
+ * @Description TODO
+ * @Date 2021/1/3 11:18
+ * @Created by limeng
*/
public class ElasticRestClient {
- public static final Logger logger = LoggerFactory.getLogger(ElasticRestClient.class);
-
- private static final int HEAP_BUFFER_SIZE = 100 * 1024 * 1024;
- private static final int SOCK_TIMEOUT_IN_MILLISECONDS = 60000;
- private static final int CONN_TIMEOUT_IN_MILLISECONDS = 5000;
- private static final int REQ_TIMEOUT_IN_MILLISECONDS = 60000;
- private static final int MASTER_TIMEOUT_IN_MILLISECONDS = 30000;
-
- private static final String INCLUDE_TYPE_NAME = "include_type_name";
- private static final String MASTER_TIMEOUT = "master_timeout";
-
- static final String FIELD_PROPS = "properties";
- private static final String MAPPING_PATH = "_mapping";
- private static final String MAPPING_TYPE_HEAD = "_mapping_type";
- private static final int DEFAULT_BACKOFF_DELAY_MILLS = 1000;
- private static final int DEFAULT_BACKOFF_TIMES = 3;
- static final String MAPPING_TYPE_DEFAULT = "_doc";
-
- private static final RequestOptions COMMON_OPTIONS;
-
- private List bulkProcessors = new ArrayList<>();
- private Map clientConfig = new HashMap<>();
- private boolean matchVerison = true;
- static{
- RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
- builder.setHttpAsyncResponseConsumerFactory(
- new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(HEAP_BUFFER_SIZE)
- );
- COMMON_OPTIONS = builder.build();
- }
- private RestHighLevelClient restClient;
+ private static final Logger log = LoggerFactory.getLogger(ElasticRestClient.class);
- ElasticRestClient(String[] endPoint, String username, String password, SSLContext sslContext,
- Map clientConfig) throws IOException {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(username, password));
- initialClient(endPoint, credentialsProvider, sslContext, clientConfig);
+ static RestClient createClient(String[] endPoints) {
+ return createClient(endPoints,null,null,null);
}
- ElasticRestClient(String[] endPoints, CredentialsProvider credentialsProvider,
- SSLContext sslContext, Map clientConfig) throws IOException {
- initialClient(endPoints, credentialsProvider, sslContext, clientConfig);
+ static String getDocumentMetadata(){
+ return null;
}
- BulkProcessor createBulk(BulkProcessor.Listener listener, int bulkActions, int bulkPerTask){
- BiConsumer> consumer = ((bulkRequest, bulkResponseActionListener)
- -> restClient.bulkAsync(bulkRequest, COMMON_OPTIONS, bulkResponseActionListener));
- BulkProcessor.Builder builder = BulkProcessor.builder(consumer, listener);
- builder.setBulkActions(bulkActions);
- builder.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES));
- builder.setConcurrentRequests(bulkPerTask - 1);
- builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(DEFAULT_BACKOFF_DELAY_MILLS),
- DEFAULT_BACKOFF_TIMES));
- BulkProcessor bulkProcessor = builder.build();
- bulkProcessors.add(bulkProcessor);
- return bulkProcessor;
- }
-
- void close(){
- for(BulkProcessor bulkProcessor : bulkProcessors){
- bulkProcessor.close();
- }
- execute(restClient ->{
- try {
- restClient.close();
- }catch(Exception e){
- throw DataXException.asDataXException(ElasticWriterErrorCode.CLOSE_EXCEPTION, e);
+ static RestClient createClient(String[] endPoints,String username,String password,String keystorePath) {
+ try {
+ HttpHost[] httpHosts = new HttpHost[endPoints.length];
+ int i = 0;
+ for(String address:endPoints){
+ URL url = new URL(address);
+ httpHosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+ i++;
}
- return null;
- });
- }
-
- boolean existIndices(String... indices){
- return execute(restClient -> restClient.indices().exists(configureTimedRequest(new GetIndexRequest(indices)),
- COMMON_OPTIONS));
- }
-
- boolean deleteIndices(String... indices){
- return execute( restClient -> {
- AcknowledgedResponse response = restClient.indices()
- .delete(new DeleteIndexRequest(indices), COMMON_OPTIONS);
- return response.isAcknowledged();
- });
- }
-
- void createIndex(String indexName, String typeName, Map settings,
- Map