From 0f4573f2a87a2d0097e3a737e59fab1225b517c1 Mon Sep 17 00:00:00 2001 From: 77954309 <77954309@qq.com> Date: Sun, 3 Jan 2021 11:49:05 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E7=94=A8es=20restclient=20=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=205,6,7=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/datax-elasticsearchwriter/pom.xml | 12 + .../v6/ElasticRestClient.java | 419 +++--------------- .../v6/ElasticRestHighClient.java | 393 ++++++++++++++++ .../v6/ElasticRestWriter.java | 265 +++++++++++ .../elasticsearchwriter/v6/ElasticWriter.java | 22 +- .../v6/ElasticWriterErrorCode.java | 4 +- .../v6/column/ElasticColumn.java | 69 +++ pom.xml | 10 + 8 files changed, 836 insertions(+), 358 deletions(-) create mode 100644 modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestHighClient.java create mode 100644 modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml b/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml index a8c830f35..ae5d3ef44 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml @@ -76,6 +76,18 @@ com.alibaba fastjson + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java index 59dc6db80..484feebc0 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestClient.java @@ -1,393 +1,120 @@ -/* - * - * Copyright 2020 WeBank - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6; import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.HttpRequestInterceptor; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.methods.HttpRequestWrapper; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContexts; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.main.MainResponse; -import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.*; -import org.elasticsearch.client.indices.*; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; -import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.KeyManagementException; +import java.net.URL; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.util.*; -import java.util.function.BiConsumer; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; /** - * @author davidhua - * 2019/8/1 + * @Classname ElasticRestClient + * @Description TODO + * @Date 2021/1/3 11:18 + * @Created by limeng */ public class ElasticRestClient { - public static final Logger logger = LoggerFactory.getLogger(ElasticRestClient.class); - - private static final int HEAP_BUFFER_SIZE = 100 * 1024 * 1024; - private static final int SOCK_TIMEOUT_IN_MILLISECONDS = 60000; - private static final int CONN_TIMEOUT_IN_MILLISECONDS = 5000; - private static final int REQ_TIMEOUT_IN_MILLISECONDS = 60000; - private static final int MASTER_TIMEOUT_IN_MILLISECONDS = 30000; - - private static final String INCLUDE_TYPE_NAME = "include_type_name"; - private static final String MASTER_TIMEOUT = "master_timeout"; - - static final String FIELD_PROPS = "properties"; - private static final String MAPPING_PATH = "_mapping"; - private static final String MAPPING_TYPE_HEAD = "_mapping_type"; - private static final int DEFAULT_BACKOFF_DELAY_MILLS = 1000; - private static final int DEFAULT_BACKOFF_TIMES = 3; - static final String MAPPING_TYPE_DEFAULT = "_doc"; - - private static final RequestOptions COMMON_OPTIONS; - - private List bulkProcessors = new ArrayList<>(); - private Map clientConfig = new HashMap<>(); - private boolean matchVerison = true; - static{ - RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); - builder.setHttpAsyncResponseConsumerFactory( - new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(HEAP_BUFFER_SIZE) - ); - COMMON_OPTIONS = builder.build(); - } - private RestHighLevelClient restClient; + private static final Logger log = LoggerFactory.getLogger(ElasticRestClient.class); - ElasticRestClient(String[] endPoint, String username, String password, SSLContext sslContext, - Map clientConfig) throws IOException { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(username, password)); - initialClient(endPoint, credentialsProvider, sslContext, clientConfig); + static RestClient createClient(String[] endPoints) { + return createClient(endPoints,null,null,null); } - ElasticRestClient(String[] endPoints, CredentialsProvider credentialsProvider, - SSLContext sslContext, Map clientConfig) throws IOException { - initialClient(endPoints, credentialsProvider, sslContext, clientConfig); + static String getDocumentMetadata(){ + return null; } - BulkProcessor createBulk(BulkProcessor.Listener listener, int bulkActions, int bulkPerTask){ - BiConsumer> consumer = ((bulkRequest, bulkResponseActionListener) - -> restClient.bulkAsync(bulkRequest, COMMON_OPTIONS, bulkResponseActionListener)); - BulkProcessor.Builder builder = BulkProcessor.builder(consumer, listener); - builder.setBulkActions(bulkActions); - builder.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES)); - builder.setConcurrentRequests(bulkPerTask - 1); - builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(DEFAULT_BACKOFF_DELAY_MILLS), - DEFAULT_BACKOFF_TIMES)); - BulkProcessor bulkProcessor = builder.build(); - bulkProcessors.add(bulkProcessor); - return bulkProcessor; - } - - void close(){ - for(BulkProcessor bulkProcessor : bulkProcessors){ - bulkProcessor.close(); - } - execute(restClient ->{ - try { - restClient.close(); - }catch(Exception e){ - throw DataXException.asDataXException(ElasticWriterErrorCode.CLOSE_EXCEPTION, e); + static RestClient createClient(String[] endPoints,String username,String password,String keystorePath) { + try { + HttpHost[] httpHosts = new HttpHost[endPoints.length]; + int i = 0; + for(String address:endPoints){ + URL url = new URL(address); + httpHosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol()); + i++; } - return null; - }); - } - - boolean existIndices(String... indices){ - return execute(restClient -> restClient.indices().exists(configureTimedRequest(new GetIndexRequest(indices)), - COMMON_OPTIONS)); - } - - boolean deleteIndices(String... indices){ - return execute( restClient -> { - AcknowledgedResponse response = restClient.indices() - .delete(new DeleteIndexRequest(indices), COMMON_OPTIONS); - return response.isAcknowledged(); - }); - } - - void createIndex(String indexName, String typeName, Map settings, - Map properties){ - execute( restClient ->{ - if(!existIndices(indexName)) { - createIndex(indexName, settings); + RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); + if (username != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + restClientBuilder.setHttpClientConfigCallback( + httpAsyncClientBuilder -> + httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } - putMapping(indexName, typeName, properties); - return null; - }); - } - - Map getProps(String indexName, String typeName){ - return execute( restClient->{ - GetMappingsRequest request = new GetMappingsRequest(); - request.indices(indexName); - RequestOptions.Builder optionsBuilder = COMMON_OPTIONS.toBuilder(); - optionsBuilder.addHeader(MAPPING_TYPE_HEAD, typeName); - GetMappingsResponse response = restClient.indices() - .getMapping(configureTimedRequest(request), optionsBuilder.build()); - Map typeMap = response.mappings().get(indexName).sourceAsMap(); - Map propsMap = typeMap; - if(typeMap.containsKey(typeName)) { - Object type = typeMap.get(typeName); - if (type instanceof Map) { - propsMap = (Map)type; + if (keystorePath != null && !keystorePath.isEmpty()) { + KeyStore keyStore = KeyStore.getInstance("jks"); + try (InputStream is = new FileInputStream(new File(keystorePath))) { + String keystorePassword = password; + keyStore.load(is, (keystorePassword == null) ? null : keystorePassword.toCharArray()); } + final SSLContext sslContext = + SSLContexts.custom() + .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()) + .build(); + final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext); + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy)); } - Object props = propsMap.get(FIELD_PROPS); - if (props instanceof Map) { - return (Map) props; - } - return null; - }); - } - private void putMapping(String indexName, String typeName, Map properties) throws IOException { - if(null == properties){ - properties = new HashMap<>(); - } - Map mappings = new HashMap<>(1); - mappings.put(FIELD_PROPS, properties); - PutMappingRequest request = new PutMappingRequest(indexName).source(mappings); - RequestOptions.Builder optionsBuilder = COMMON_OPTIONS.toBuilder(); - optionsBuilder.addHeader(MAPPING_TYPE_HEAD, typeName); - AcknowledgedResponse acknowledgedResponse = restClient.indices().putMapping(configureTimedRequest(request), optionsBuilder.build()); - if(!acknowledgedResponse.isAcknowledged()){ - throw DataXException.asDataXException(ElasticWriterErrorCode.PUT_MAPPINGS_ERROR, - "can't put mapping, type:[" + typeName +"], properties:" +JSON.toJSONString(properties)); - } - } - private void createIndex(String indexName, Map settings) throws IOException { - if(null == settings){ - settings = new HashMap<>(1); - } - CreateIndexRequest request = new CreateIndexRequest(indexName) - .settings(settings).waitForActiveShards(ActiveShardCount.DEFAULT); - try { - CreateIndexResponse response = restClient.indices().create(configureTimedRequest(request), COMMON_OPTIONS); - if(!response.isAcknowledged()){ - throw DataXException.asDataXException(ElasticWriterErrorCode.CREATE_INDEX_ERROR, "can't create index:[" + indexName + - "], settings:" + JSON.toJSONString(settings) + ", message:[acknowledged=false]"); - } - }catch(ElasticsearchException e){ - if(e.status().getStatus() - != RestStatus.BAD_REQUEST.getStatus()){ - throw e; - } - logger.error("index:["+ indexName +"] maybe already existed, status=" + e.status().getStatus()); - } - } + return restClientBuilder.build(); - private T configureTimedRequest(T request){ - request.setMasterTimeout(TimeValue - .timeValueMillis(Integer - .valueOf(String.valueOf(clientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_MASTER_TIMEOUT, MASTER_TIMEOUT_IN_MILLISECONDS))) - )); - request.setTimeout(TimeValue - .timeValueMillis(Integer - .valueOf(String.valueOf(clientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_REQ_TIMEOUT, REQ_TIMEOUT_IN_MILLISECONDS))) - )); - return request; - } - - private R execute(Exec execFunc){ - try { - return execFunc.apply(restClient); - }catch(ElasticsearchException e){ - throw DataXException.asDataXException(ElasticWriterErrorCode.REQUEST_ERROR, e.status().name(), e); - }catch (Exception e) { - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); - } - } - - - static ElasticRestClient custom(String[] endPoints, Map clientConfig){ - try { - return new ElasticRestClient(endPoints, null, null, clientConfig); - } catch (IOException e) { + }catch (Exception e){ throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); } } - static ElasticRestClient custom(String[] endPoints, - String username, String password, Map clientConfig){ - try { - return new ElasticRestClient(endPoints, username, password, null, clientConfig); - } catch (IOException e) { - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); - } - } - static ElasticRestClient sslCustom(String[] endPoints, - String keyStorePath, String keyStorePass, Map clientConfig){ - try { - return new ElasticRestClient(endPoints, null, buildSSLContext(keyStorePath, keyStorePass) - , clientConfig); - } catch (IOException e) { - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + static Request getRequest(String method, String endpoint, HttpEntity entity, Map params ){ + checkArgument(StringUtils.isNotBlank(method) || StringUtils.isNotBlank(endpoint) ,"request method or endpoint is null"); + Request request = new Request(method,endpoint); + if(entity != null){ + request.setEntity(entity); } - } + if(params !=null && !params.isEmpty()){ + try { + params.forEach((k,v)->{ + request.addParameter(k,v); + }); + }catch (Exception e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BULK_REQ_ERROR, e); + } - static ElasticRestClient sslCustom(String[] endPoints, - String username, String password, - String keyStorePath, String keyStorePass, Map clientConfig){ - try{ - return new ElasticRestClient(endPoints, username, password, - buildSSLContext(keyStorePath, keyStorePass), clientConfig); - }catch(IOException e){ - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); } + return request; } - private static SSLContext buildSSLContext(String keyStorePath, String keyStorePass){ - try { - KeyStore truststore = KeyStore.getInstance("jks"); - try (InputStream inputStream = Files.newInputStream(Paths.get(new URI(keyStorePath)))) { - truststore.load(inputStream, keyStorePass.toCharArray()); - } catch (URISyntaxException | IOException | NoSuchAlgorithmException | CertificateException e) { - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); - } - SSLContextBuilder sslContextBuilder = SSLContexts.custom() - .loadTrustMaterial(truststore, null); - return sslContextBuilder.build(); - }catch(KeyStoreException | NoSuchAlgorithmException | KeyManagementException e){ - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); - } + static Request getRequest(String method, String endpoint, Map params ){ + return getRequest(method,endpoint,null,params); } - private void initialClient(String[] endPoints, CredentialsProvider credentialsProvider, - SSLContext sslContext, Map clientConfig) throws IOException { - if(null == clientConfig){ - clientConfig = Collections.emptyMap(); - } - HttpHost[] httpHosts = new HttpHost[endPoints.length]; - for(int i = 0 ; i < endPoints.length; i++){ - httpHosts[i] = HttpHost.create(endPoints[i]); - } - RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); - Map finalClientConfig = clientConfig; - restClientBuilder.setHttpClientConfigCallback( - httpClientBuilder -> { - if(null != credentialsProvider) { - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - if(null != sslContext){ - httpClientBuilder.setSSLContext(sslContext); - } - httpClientBuilder.addInterceptorFirst((HttpRequestInterceptor) (httpRequest, httpContext) -> { - if(httpRequest instanceof HttpRequestWrapper){ - HttpRequestWrapper wrapper = (HttpRequestWrapper)httpRequest; - String uri = wrapper.getURI().toString(); - if(matchVerison) { - uri = uri.replace(INCLUDE_TYPE_NAME + "=false", INCLUDE_TYPE_NAME + "=true"); - }else{ - //when use the different version, remove the INCLUDE_TYPE_NAME - uri = uri.replaceAll(INCLUDE_TYPE_NAME + "=[^&]+", "") - .replaceAll(MASTER_TIMEOUT + "=[^&]+", ""); - } - String type = MAPPING_TYPE_DEFAULT; - if (null != wrapper.getFirstHeader(MAPPING_TYPE_HEAD)) { - type = wrapper.getFirstHeader(MAPPING_TYPE_HEAD).getValue(); - } - uri = uri.replace(MAPPING_PATH, MAPPING_PATH + "/" + type); - try { - wrapper.setURI(new URI(uri)); - } catch (URISyntaxException e) { - logger.error(e.getMessage(), e); - } - } - }); - httpClientBuilder.setMaxConnTotal(Integer.parseInt( - String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_POOL_SIZE, 1)))); - return httpClientBuilder; - } - ); - restClientBuilder.setRequestConfigCallback( - requestConfigBuilder -> requestConfigBuilder - .setContentCompressionEnabled(true) - .setConnectTimeout(Integer.parseInt( - String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_CONN_TIMEOUT, - CONN_TIMEOUT_IN_MILLISECONDS)))) - .setConnectionRequestTimeout(Integer.parseInt( - String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_CONN_TIMEOUT, - CONN_TIMEOUT_IN_MILLISECONDS)))) - .setSocketTimeout(Integer.parseInt( - String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_SOCKET_TIMEOUT, - SOCK_TIMEOUT_IN_MILLISECONDS))))); - restClient = new RestHighLevelClient(restClientBuilder); - boolean connect = restClient.ping(COMMON_OPTIONS); - if(! connect){ - throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, "Ping to elastic server failed"); - } - //check the version - checkVersion(); - this.clientConfig = clientConfig; + static Request getRequest(String method, String endpoint, HttpEntity entity){ + return getRequest(method,endpoint,entity,null); } - private void checkVersion() throws IOException { - logger.info("Check the version of ElasticSearch"); - MainResponse response = restClient.info(COMMON_OPTIONS); - Version version = response.getVersion(); - if(!version.isCompatible(Version.CURRENT)){ - throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, - "ElasticSearch's version is not compatible"); - } - logger.info("The version of ElasticSearch: [" + version.toString() +"]"); - if(version.major != Version.CURRENT.major){ - throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, - "ElasticSearch's version is not compatible"); - } - } - @FunctionalInterface - interface Exec { - R apply(T t) throws Exception; + static Request getRequest(String method, String endpoint){ + return getRequest(method,endpoint,null,null); } } diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestHighClient.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestHighClient.java new file mode 100644 index 000000000..400fa8865 --- /dev/null +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestHighClient.java @@ -0,0 +1,393 @@ +/* + * + * Copyright 2020 WeBank + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.fastjson.JSON; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.methods.HttpRequestWrapper; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.main.MainResponse; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.*; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.util.*; +import java.util.function.BiConsumer; + +/** + * @author davidhua + * 2019/8/1 + */ +public class ElasticRestHighClient { + public static final Logger logger = LoggerFactory.getLogger(ElasticRestHighClient.class); + + private static final int HEAP_BUFFER_SIZE = 100 * 1024 * 1024; + private static final int SOCK_TIMEOUT_IN_MILLISECONDS = 60000; + private static final int CONN_TIMEOUT_IN_MILLISECONDS = 5000; + private static final int REQ_TIMEOUT_IN_MILLISECONDS = 60000; + private static final int MASTER_TIMEOUT_IN_MILLISECONDS = 30000; + + private static final String INCLUDE_TYPE_NAME = "include_type_name"; + private static final String MASTER_TIMEOUT = "master_timeout"; + + static final String FIELD_PROPS = "properties"; + private static final String MAPPING_PATH = "_mapping"; + private static final String MAPPING_TYPE_HEAD = "_mapping_type"; + private static final int DEFAULT_BACKOFF_DELAY_MILLS = 1000; + private static final int DEFAULT_BACKOFF_TIMES = 3; + static final String MAPPING_TYPE_DEFAULT = "_doc"; + + private static final RequestOptions COMMON_OPTIONS; + + private List bulkProcessors = new ArrayList<>(); + private Map clientConfig = new HashMap<>(); + private boolean matchVerison = true; + static{ + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + builder.setHttpAsyncResponseConsumerFactory( + new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(HEAP_BUFFER_SIZE) + ); + COMMON_OPTIONS = builder.build(); + } + private RestHighLevelClient restClient; + + ElasticRestHighClient(String[] endPoint, String username, String password, SSLContext sslContext, + Map clientConfig) throws IOException { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + initialClient(endPoint, credentialsProvider, sslContext, clientConfig); + } + + ElasticRestHighClient(String[] endPoints, CredentialsProvider credentialsProvider, + SSLContext sslContext, Map clientConfig) throws IOException { + initialClient(endPoints, credentialsProvider, sslContext, clientConfig); + } + + BulkProcessor createBulk(BulkProcessor.Listener listener, int bulkActions, int bulkPerTask){ + BiConsumer> consumer = ((bulkRequest, bulkResponseActionListener) + -> restClient.bulkAsync(bulkRequest, COMMON_OPTIONS, bulkResponseActionListener)); + BulkProcessor.Builder builder = BulkProcessor.builder(consumer, listener); + builder.setBulkActions(bulkActions); + builder.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES)); + builder.setConcurrentRequests(bulkPerTask - 1); + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(DEFAULT_BACKOFF_DELAY_MILLS), + DEFAULT_BACKOFF_TIMES)); + BulkProcessor bulkProcessor = builder.build(); + bulkProcessors.add(bulkProcessor); + return bulkProcessor; + } + + void close(){ + for(BulkProcessor bulkProcessor : bulkProcessors){ + bulkProcessor.close(); + } + execute(restClient ->{ + try { + restClient.close(); + }catch(Exception e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.CLOSE_EXCEPTION, e); + } + return null; + }); + } + + boolean existIndices(String... indices){ + return execute(restClient -> restClient.indices().exists(configureTimedRequest(new GetIndexRequest(indices)), + COMMON_OPTIONS)); + } + + boolean deleteIndices(String... indices){ + return execute( restClient -> { + AcknowledgedResponse response = restClient.indices() + .delete(new DeleteIndexRequest(indices), COMMON_OPTIONS); + return response.isAcknowledged(); + }); + } + + void createIndex(String indexName, String typeName, Map settings, + Map properties){ + execute( restClient ->{ + if(!existIndices(indexName)) { + createIndex(indexName, settings); + } + putMapping(indexName, typeName, properties); + return null; + }); + } + + Map getProps(String indexName, String typeName){ + return execute( restClient->{ + GetMappingsRequest request = new GetMappingsRequest(); + request.indices(indexName); + RequestOptions.Builder optionsBuilder = COMMON_OPTIONS.toBuilder(); + optionsBuilder.addHeader(MAPPING_TYPE_HEAD, typeName); + GetMappingsResponse response = restClient.indices() + .getMapping(configureTimedRequest(request), optionsBuilder.build()); + Map typeMap = response.mappings().get(indexName).sourceAsMap(); + Map propsMap = typeMap; + if(typeMap.containsKey(typeName)) { + Object type = typeMap.get(typeName); + if (type instanceof Map) { + propsMap = (Map)type; + } + } + Object props = propsMap.get(FIELD_PROPS); + if (props instanceof Map) { + return (Map) props; + } + return null; + }); + } + private void putMapping(String indexName, String typeName, Map properties) throws IOException { + if(null == properties){ + properties = new HashMap<>(); + } + Map mappings = new HashMap<>(1); + mappings.put(FIELD_PROPS, properties); + PutMappingRequest request = new PutMappingRequest(indexName).source(mappings); + RequestOptions.Builder optionsBuilder = COMMON_OPTIONS.toBuilder(); + optionsBuilder.addHeader(MAPPING_TYPE_HEAD, typeName); + AcknowledgedResponse acknowledgedResponse = restClient.indices().putMapping(configureTimedRequest(request), optionsBuilder.build()); + if(!acknowledgedResponse.isAcknowledged()){ + throw DataXException.asDataXException(ElasticWriterErrorCode.PUT_MAPPINGS_ERROR, + "can't put mapping, type:[" + typeName +"], properties:" +JSON.toJSONString(properties)); + } + } + + private void createIndex(String indexName, Map settings) throws IOException { + if(null == settings){ + settings = new HashMap<>(1); + } + CreateIndexRequest request = new CreateIndexRequest(indexName) + .settings(settings).waitForActiveShards(ActiveShardCount.DEFAULT); + try { + CreateIndexResponse response = restClient.indices().create(configureTimedRequest(request), COMMON_OPTIONS); + if(!response.isAcknowledged()){ + throw DataXException.asDataXException(ElasticWriterErrorCode.CREATE_INDEX_ERROR, "can't create index:[" + indexName + + "], settings:" + JSON.toJSONString(settings) + ", message:[acknowledged=false]"); + } + }catch(ElasticsearchException e){ + if(e.status().getStatus() + != RestStatus.BAD_REQUEST.getStatus()){ + throw e; + } + logger.error("index:["+ indexName +"] maybe already existed, status=" + e.status().getStatus()); + } + } + + private T configureTimedRequest(T request){ + request.setMasterTimeout(TimeValue + .timeValueMillis(Integer + .valueOf(String.valueOf(clientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_MASTER_TIMEOUT, MASTER_TIMEOUT_IN_MILLISECONDS))) + )); + request.setTimeout(TimeValue + .timeValueMillis(Integer + .valueOf(String.valueOf(clientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_REQ_TIMEOUT, REQ_TIMEOUT_IN_MILLISECONDS))) + )); + return request; + } + + private R execute(Exec execFunc){ + try { + return execFunc.apply(restClient); + }catch(ElasticsearchException e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.REQUEST_ERROR, e.status().name(), e); + }catch (Exception e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + + static ElasticRestHighClient custom(String[] endPoints, Map clientConfig){ + try { + return new ElasticRestHighClient(endPoints, null, null, clientConfig); + } catch (IOException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + static ElasticRestHighClient custom(String[] endPoints, + String username, String password, Map clientConfig){ + try { + return new ElasticRestHighClient(endPoints, username, password, null, clientConfig); + } catch (IOException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + static ElasticRestHighClient sslCustom(String[] endPoints, + String keyStorePath, String keyStorePass, Map clientConfig){ + try { + return new ElasticRestHighClient(endPoints, null, buildSSLContext(keyStorePath, keyStorePass) + , clientConfig); + } catch (IOException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + static ElasticRestHighClient sslCustom(String[] endPoints, + String username, String password, + String keyStorePath, String keyStorePass, Map clientConfig){ + try{ + return new ElasticRestHighClient(endPoints, username, password, + buildSSLContext(keyStorePath, keyStorePass), clientConfig); + }catch(IOException e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + private static SSLContext buildSSLContext(String keyStorePath, String keyStorePass){ + try { + KeyStore truststore = KeyStore.getInstance("jks"); + try (InputStream inputStream = Files.newInputStream(Paths.get(new URI(keyStorePath)))) { + truststore.load(inputStream, keyStorePass.toCharArray()); + } catch (URISyntaxException | IOException | NoSuchAlgorithmException | CertificateException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + SSLContextBuilder sslContextBuilder = SSLContexts.custom() + .loadTrustMaterial(truststore, null); + return sslContextBuilder.build(); + }catch(KeyStoreException | NoSuchAlgorithmException | KeyManagementException e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + } + + private void initialClient(String[] endPoints, CredentialsProvider credentialsProvider, + SSLContext sslContext, Map clientConfig) throws IOException { + if(null == clientConfig){ + clientConfig = Collections.emptyMap(); + } + HttpHost[] httpHosts = new HttpHost[endPoints.length]; + for(int i = 0 ; i < endPoints.length; i++){ + httpHosts[i] = HttpHost.create(endPoints[i]); + } + RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); + Map finalClientConfig = clientConfig; + restClientBuilder.setHttpClientConfigCallback( + httpClientBuilder -> { + if(null != credentialsProvider) { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + if(null != sslContext){ + httpClientBuilder.setSSLContext(sslContext); + } + httpClientBuilder.addInterceptorFirst((HttpRequestInterceptor) (httpRequest, httpContext) -> { + if(httpRequest instanceof HttpRequestWrapper){ + HttpRequestWrapper wrapper = (HttpRequestWrapper)httpRequest; + String uri = wrapper.getURI().toString(); + if(matchVerison) { + uri = uri.replace(INCLUDE_TYPE_NAME + "=false", INCLUDE_TYPE_NAME + "=true"); + }else{ + //when use the different version, remove the INCLUDE_TYPE_NAME + uri = uri.replaceAll(INCLUDE_TYPE_NAME + "=[^&]+", "") + .replaceAll(MASTER_TIMEOUT + "=[^&]+", ""); + } + String type = MAPPING_TYPE_DEFAULT; + if (null != wrapper.getFirstHeader(MAPPING_TYPE_HEAD)) { + type = wrapper.getFirstHeader(MAPPING_TYPE_HEAD).getValue(); + } + uri = uri.replace(MAPPING_PATH, MAPPING_PATH + "/" + type); + try { + wrapper.setURI(new URI(uri)); + } catch (URISyntaxException e) { + logger.error(e.getMessage(), e); + } + } + }); + httpClientBuilder.setMaxConnTotal(Integer.parseInt( + String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_POOL_SIZE, 1)))); + return httpClientBuilder; + } + ); + restClientBuilder.setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder + .setContentCompressionEnabled(true) + .setConnectTimeout(Integer.parseInt( + String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_CONN_TIMEOUT, + CONN_TIMEOUT_IN_MILLISECONDS)))) + .setConnectionRequestTimeout(Integer.parseInt( + String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_CONN_TIMEOUT, + CONN_TIMEOUT_IN_MILLISECONDS)))) + .setSocketTimeout(Integer.parseInt( + String.valueOf(finalClientConfig.getOrDefault(ElasticKey.CLIENT_CONFIG_SOCKET_TIMEOUT, + SOCK_TIMEOUT_IN_MILLISECONDS))))); + restClient = new RestHighLevelClient(restClientBuilder); + boolean connect = restClient.ping(COMMON_OPTIONS); + if(! connect){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, "Ping to elastic server failed"); + } + //check the version + checkVersion(); + this.clientConfig = clientConfig; + } + + private void checkVersion() throws IOException { + logger.info("Check the version of ElasticSearch"); + MainResponse response = restClient.info(COMMON_OPTIONS); + Version version = response.getVersion(); + if(!version.isCompatible(Version.CURRENT)){ + throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, + "ElasticSearch's version is not compatible"); + } + logger.info("The version of ElasticSearch: [" + version.toString() +"]"); + if(version.major != Version.CURRENT.major){ + throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, + "ElasticSearch's version is not compatible"); + } + } + @FunctionalInterface + interface Exec { + R apply(T t) throws Exception; + } + +} diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java new file mode 100644 index 000000000..6ce778fdb --- /dev/null +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java @@ -0,0 +1,265 @@ +package com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.BasicDataReceiver; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.column.ElasticColumn; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.ElasticRestWriter.Job.DEFAULT_ENDPOINT_SPLIT; +import static com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.ElasticRestWriter.Job.WRITE_SIZE; + +/** + * @Classname ElasticRestWriter + * @Description TODO + * @Date 2021/1/3 11:21 + * @Created by limeng + */ +public class ElasticRestWriter extends Writer { + public static class Job extends Writer.Job{ + private static final Logger log = LoggerFactory.getLogger(Job.class); + + private static final String DEFAULT_ID = "_id"; + static final String WRITE_SIZE = "WRITE_SIZE"; + + static final String DEFAULT_ENDPOINT_SPLIT = ","; + + private Configuration jobConf = null; + private String[] endPoints; + + @Override + public void prepare() { + String indexName = this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); + String indexType = this.jobConf.getString(ElasticKey.INDEX_TYPE, "_doc"); + log.info(String.format("index:[%s], type:[%s]", indexName, indexType)); + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList<>(); + for( int i = 0; i < mandatoryNumber; i++){ + configurations.add(this.jobConf.clone()); + } + return configurations; + } + + @Override + public void init() { + this.jobConf = super.getPluginJobConf(); + this.validateParams(); + } + + @Override + public void destroy() { + + } + + private void validateParams(){ + String endPoints = this.jobConf.getString(ElasticKey.ENDPOINTS); + if(StringUtils.isBlank(endPoints)){ + throw DataXException.asDataXException(ElasticWriterErrorCode.REQUIRE_VALUE, "'endPoints(elasticUrls)' is necessary"); + } + + this.endPoints = endPoints.split(DEFAULT_ENDPOINT_SPLIT); + this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); + } + } + + public static class Task extends Writer.Task{ + private static final Logger logger = LoggerFactory.getLogger(Task.class); + + private Configuration taskConf; + private String indexName; + private String typeName; + private String columnNameSeparator = ElasticColumn.DEFAULT_NAME_SPLIT; + private List columns; + private RestClient restClient; + + private ArrayList batch; + + private Integer batchSize; + private long currentBatchSizeBytes; + private Long maxBatchSizeBytes = 5L * 1024L * 1024L; + private int backendVersion; + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Override + public void startWrite(BasicDataReceiver receiver, Class type) { + logger.info("Begin to BasicDataReceiver write record to ElasticSearch"); + super.startWrite(receiver, type); + logger.info("End to BasicDataReceiver write record to ElasticSearch"); + throw DataXException.asDataXException(ElasticWriterErrorCode.FEATURES_ERROR, "功能不支持"); + } + @Override + public void startWrite(RecordReceiver lineReceiver) { + logger.info("Begin to rest write record to ElasticSearch"); + Record record = null; + long count = 0; + while(null != (record = lineReceiver.getFromReader())){ + String document = ElasticColumn.recordToString(record, columns, columnNameSeparator); + String documentMetadata = "{}"; + batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document)); + + currentBatchSizeBytes += document.getBytes(StandardCharsets.UTF_8).length; + + if(batch.size() >= batchSize || currentBatchSizeBytes >= maxBatchSizeBytes){ + flushBatch(); + } + count += 1; + } + flushBatch(); + getTaskPluginCollector().collectMessage(WRITE_SIZE, String.valueOf(count)); + logger.info("End to rest write record to ElasticSearch"); + } + + private void flushBatch() { + if(batch.isEmpty()){ + return; + } + + StringBuilder bulkRequest = new StringBuilder(); + for(String json:batch){ + bulkRequest.append(json); + } + batch.clear(); + currentBatchSizeBytes = 0; + Response response; + HttpEntity responseEntity; + String endPoint = String.format( + "/%s/%s/_bulk", + indexName, + typeName); + + + HttpEntity requestBody = + new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); + + try { + response = restClient.performRequest(ElasticRestClient.getRequest("POST", endPoint, requestBody)); + responseEntity = new BufferedHttpEntity(response.getEntity()); + }catch (IOException e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BAD_CONNECT, e); + } + + try { + checkForErrors(responseEntity, backendVersion); + }catch (IOException e){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BULK_REQ_ERROR, e); + } + + } + + @Override + public void init() { + this.taskConf = super.getPluginJobConf(); + indexName = this.taskConf.getString(ElasticKey.INDEX_NAME); + typeName = this.taskConf.getString(ElasticKey.INDEX_TYPE, ElasticRestHighClient.MAPPING_TYPE_DEFAULT); + columnNameSeparator = this.taskConf.getString(ElasticKey.COLUMN_NAME_SEPARATOR, ElasticColumn.DEFAULT_NAME_SPLIT); + batchSize = this.taskConf.getInt(ElasticKey.BULK_ACTIONS, 10000); + batch = new ArrayList<>(); + + + columns = JSON.parseObject(this.taskConf.getString(ElasticKey.PROPS_COLUMN), new TypeReference>(){ + }); + + + String[] endPoints = this.taskConf.getString(ElasticKey.ENDPOINTS).split(DEFAULT_ENDPOINT_SPLIT); + restClient = ElasticRestClient.createClient(endPoints); + + backendVersion = getBackendVersion(); + } + + @Override + public void destroy() { + if(null != restClient){ + try { + restClient.close(); + } catch (IOException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.CLOSE_EXCEPTION, e); + } + } + } + + private void checkForErrors(HttpEntity responseEntity, int backendVersion) throws IOException { + JsonNode searchResult = parseResponse(responseEntity); + boolean errors = searchResult.path("errors").asBoolean(); + if (errors) { + StringBuilder errorMessages = + new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:"); + JsonNode items = searchResult.path("items"); + //some items present in bulk might have errors, concatenate error messages + for (JsonNode item : items) { + + String errorRootName = ""; + if (backendVersion == 2) { + errorRootName = "create"; + } else if (backendVersion == 5 || backendVersion == 6) { + errorRootName = "index"; + } + JsonNode errorRoot = item.path(errorRootName); + JsonNode error = errorRoot.get("error"); + if (error != null) { + String type = error.path("type").asText(); + String reason = error.path("reason").asText(); + String docId = errorRoot.path("_id").asText(); + errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); + JsonNode causedBy = error.get("caused_by"); + if (causedBy != null) { + String cbReason = causedBy.path("reason").asText(); + String cbType = causedBy.path("type").asText(); + errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); + } + } + } + + throw new IOException(errorMessages.toString()); + } + } + + private int getBackendVersion(){ + try{ + Response response = restClient.performRequest(ElasticRestClient.getRequest("GET", "")); + JsonNode jsonNode = parseResponse(response.getEntity()); + int backendVersion = + Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1)); + checkArgument( + (backendVersion == 2 || backendVersion == 5 || backendVersion == 6 || backendVersion==7), + "The Elasticsearch version to connect to is %s.x. " + + "This version of the ElasticsearchIO is only compatible with " + + "Elasticsearch v6.x, v5.x and v2.x", + backendVersion); + return backendVersion; + + } catch (IOException e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.ES_VERSION, e); + } + } + + private static JsonNode parseResponse(HttpEntity responseEntity) throws IOException { + return mapper.readValue(responseEntity.getContent(), JsonNode.class); + } + + } +} diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriter.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriter.java index a91d83eff..d09f8cb08 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriter.java +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriter.java @@ -73,13 +73,13 @@ public void init() { } @Override public void prepare() { - ElasticRestClient restClient; + ElasticRestHighClient restClient; Map clientConfig = jobConf.getMap(ElasticKey.CLIENT_CONFIG); if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){ - restClient = ElasticRestClient.custom(endPoints, userName, + restClient = ElasticRestHighClient.custom(endPoints, userName, password, clientConfig); }else{ - restClient = ElasticRestClient.custom(endPoints, clientConfig); + restClient = ElasticRestHighClient.custom(endPoints, clientConfig); } String indexName = this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); String indexType = this.jobConf.getString(ElasticKey.INDEX_TYPE, ""); @@ -136,7 +136,7 @@ private void validateParams(){ this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); } - private Map resolveColumn(ElasticRestClient client, + private Map resolveColumn(ElasticRestHighClient client, String index, String type , List rawColumnList, List outputColumn, String columnNameSeparator){ @@ -192,8 +192,8 @@ private void resolveColumn(List outputColumn, ElasticColumn colum levelColumn.setFormat(String.valueOf(metaMap.get(ElasticKey.PROPS_COLUMN_FORMAT))); } outputColumn.add(levelColumn); - }else if(null != metaMap.get(ElasticRestClient.FIELD_PROPS) - && metaMap.get(ElasticRestClient.FIELD_PROPS) instanceof Map){ + }else if(null != metaMap.get(ElasticRestHighClient.FIELD_PROPS) + && metaMap.get(ElasticRestHighClient.FIELD_PROPS) instanceof Map){ ElasticColumn levelColumn = column; if(null == levelColumn){ levelColumn = new ElasticColumn(); @@ -201,7 +201,7 @@ private void resolveColumn(List outputColumn, ElasticColumn colum }else{ levelColumn.setName(levelColumn.getName() + columnNameSeparator + key); } - resolveColumn(outputColumn, levelColumn, (Map)metaMap.get(ElasticRestClient.FIELD_PROPS), + resolveColumn(outputColumn, levelColumn, (Map)metaMap.get(ElasticRestHighClient.FIELD_PROPS), columnNameSeparator); } } @@ -218,14 +218,14 @@ public static class Task extends Writer.Task{ private String typeName; private String columnNameSeparator = ElasticColumn.DEFAULT_NAME_SPLIT; private List columns; - private ElasticRestClient restClient; + private ElasticRestHighClient restClient; private BulkProcessor bulkProcessor; @Override public void init() { this.taskConf = super.getPluginJobConf(); indexName = this.taskConf.getString(ElasticKey.INDEX_NAME); - typeName = this.taskConf.getString(ElasticKey.INDEX_TYPE, ElasticRestClient.MAPPING_TYPE_DEFAULT); + typeName = this.taskConf.getString(ElasticKey.INDEX_TYPE, ElasticRestHighClient.MAPPING_TYPE_DEFAULT); columnNameSeparator = this.taskConf.getString(ElasticKey.COLUMN_NAME_SEPARATOR, ElasticColumn.DEFAULT_NAME_SPLIT); int batchSize = this.taskConf.getInt(ElasticKey.BULK_ACTIONS, 1000); int bulkPerTask = this.taskConf.getInt(ElasticKey.BULK_PER_TASK, 1); @@ -242,10 +242,10 @@ public void init() { } String[] endPoints = this.taskConf.getString(ElasticKey.ENDPOINTS).split(DEFAULT_ENDPOINT_SPLIT); if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){ - restClient = ElasticRestClient.custom(endPoints, userName, + restClient = ElasticRestHighClient.custom(endPoints, userName, password, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG)); }else{ - restClient = ElasticRestClient.custom(endPoints, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG)); + restClient = ElasticRestHighClient.custom(endPoints, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG)); } this.bulkProcessor = restClient.createBulk(buildListener(getTaskPluginCollector()), batchSize, bulkPerTask); } diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriterErrorCode.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriterErrorCode.java index 64a5dfe96..4496c6428 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriterErrorCode.java +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticWriterErrorCode.java @@ -37,7 +37,9 @@ public enum ElasticWriterErrorCode implements ErrorCode { MAPPING_TYPE_UNSUPPORTED("ESWriter-08", "Unsupported mapping type"), BULK_REQ_ERROR("ESWriter-09", "Bulk request error"), INDEX_NOT_EXIST("ESWriter-10", "Index not exist"), - CONFIG_ERROR("ESWriter-11", "Config error"); + CONFIG_ERROR("ESWriter-11", "Config error"), + ES_VERSION("ESWriter-12", "Cannot get Elasticsearch version"), + FEATURES_ERROR("ESWriter-13", "Function not supported"); private final String code; private final String description; diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/column/ElasticColumn.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/column/ElasticColumn.java index 75b6c80a0..f137c5e60 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/column/ElasticColumn.java +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/column/ElasticColumn.java @@ -87,6 +87,75 @@ public void setFormat(String format) { this.format = format; } + public static String recordToString(Record record, List colConfs, String columnNameSeparator){ + String sql1="{%s}"; + boolean first=false; + + StringBuffer sb=new StringBuffer(); + for(int i = 0; i < record.getColumnNumber(); i++){ + Column column = record.getColumn(i); + ElasticColumn config = colConfs.get(i); + String columnName = config.getName(); + if(first){ + sb.append(","); + } + first = true; + sb.append("\"").append(columnName).append("\":"); + ElasticFieldDataType type = ElasticFieldDataType.valueOf(config.getType().toUpperCase()); + switch(type){ + case IP: + case IP_RANGE: + case KEYWORD: + case TEXT: + sb.append("\"").append(column.asString()).append("\"");; + break; + case GEO_POINT: + case GEO_SHAPE: + case NESTED: + case OBJECT: + sb.append("\"").append(parseObject(column.asString())).append("\""); + break; + case LONG_RANGE: + case LONG: + sb.append(column.asLong()); + break; + case INTEGER: + case INTEGER_RANGE: + case SHORT: + sb.append(column.asBigInteger()); + break; + case FLOAT: + case FLOAT_RANGE: + case HALF_FLOAT: + case SCALED_FLOAT: + case DOUBLE_RANGE: + case DOUBLE: + sb.append(column.asDouble()); + break; + case BINARY: + case BYTE: + sb.append("\"").append(column.asString()).append("\""); + break; + case BOOLEAN: + sb.append(column.asBoolean()); + break; + case DATE_RANGE: + case DATE: + sb.append("\"").append(parseDate(config, column)).append("\""); + break; + default: + throw DataXException.asDataXException(ElasticWriterErrorCode.MAPPING_TYPE_UNSUPPORTED, + "unsupported type:[" +config.getType() + "]"); + } + } + String sql2=sb.toString(); + if(StringUtils.isNoneBlank(sql2)){ + return String.format(sql1, sql2); + }else{ + return null; + } + } + public static Map toData(Record record, List colConfs, String columnNameSeparator){ Map outputData = new HashMap<>(record.getColumnNumber()); for(int i = 0; i < record.getColumnNumber(); i++){ diff --git a/pom.xml b/pom.xml index 18f97fb86..957a1f10b 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,16 @@ jackson-core ${jackson.version} + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + io.springfox springfox-swagger2 From 3ab2c8afd72b40195b9b1ad3a03898426ffae872 Mon Sep 17 00:00:00 2001 From: 77954309 <77954309@qq.com> Date: Tue, 5 Jan 2021 14:59:14 +0800 Subject: [PATCH 2/2] =?UTF-8?q?ElasticRestHighClient=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=20=E5=88=9B=E5=BB=BA=E7=B4=A2=E5=BC=95=EF=BC=8C=E5=AD=97?= =?UTF-8?q?=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../v6/ElasticRestWriter.java | 211 +++++++++++++++++- 1 file changed, 205 insertions(+), 6 deletions(-) diff --git a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java index 6ce778fdb..ef91e0d30 100644 --- a/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java +++ b/modules/executor/engine/datax/datax-elasticsearchwriter/src/main/java/com/webank/wedatasphere/exchangis/datax/plugin/writer/elasticsearchwriter/v6/ElasticRestWriter.java @@ -4,18 +4,29 @@ import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.BasicDataReceiver; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.webank.wedatasphere.exchangis.datax.common.CryptoUtils; import com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.column.ElasticColumn; +import com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.column.ElasticFieldDataType; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.entity.BufferedHttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.slf4j.Logger; @@ -23,8 +34,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import static com.google.common.base.Preconditions.checkArgument; import static com.webank.wedatasphere.exchangis.datax.plugin.writer.elasticsearchwriter.v6.ElasticRestWriter.Job.DEFAULT_ENDPOINT_SPLIT; @@ -47,12 +57,115 @@ public static class Job extends Writer.Job{ private Configuration jobConf = null; private String[] endPoints; + private String userName; + private String password; @Override public void prepare() { String indexName = this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); String indexType = this.jobConf.getString(ElasticKey.INDEX_TYPE, "_doc"); log.info(String.format("index:[%s], type:[%s]", indexName, indexType)); + + //检查索引,创建索引,可以根据实际情况修改 + ElasticRestHighClient restClient; + Map clientConfig = jobConf.getMap(ElasticKey.CLIENT_CONFIG); + if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){ + restClient = ElasticRestHighClient.custom(endPoints, userName, + password, clientConfig); + }else{ + restClient = ElasticRestHighClient.custom(endPoints, clientConfig); + } + String columnNameSeparator = this.jobConf.getString(ElasticKey.COLUMN_NAME_SEPARATOR, ElasticColumn.DEFAULT_NAME_SPLIT); + List rawColumnList = jobConf + .getList(ElasticKey.PROPS_COLUMN); + List resolvedColumnList = new ArrayList<>(); + Map props = resolveColumn(restClient, indexName, indexType, + rawColumnList, resolvedColumnList, columnNameSeparator); + this.jobConf.set(ElasticKey.PROPS_COLUMN, resolvedColumnList); + //clean up + if(jobConf.getBool(ElasticKey.CLEANUP, false) && + restClient.existIndices(indexName)){ + if(!restClient.deleteIndices(indexName)){ + throw DataXException.asDataXException(ElasticWriterErrorCode.DELETE_INDEX_ERROR, "cannot delete index:[" + indexName +"]"); + } + } + + //if the index is not existed, create it + restClient.createIndex(indexName, indexType, jobConf.getMap(ElasticKey.SETTINGS), + props); + restClient.close(); + } + + private Map resolveColumn(ElasticRestHighClient client, + String index, String type , + List rawColumnList, List outputColumn, + String columnNameSeparator){ + Map properties; + if(null != rawColumnList && !rawColumnList.isEmpty()) { + //allow to custom the fields of properties + properties = new HashMap<>(rawColumnList.size()); + rawColumnList.forEach(columnRaw -> { + String raw = columnRaw.toString(); + ElasticColumn column = JSONObject + .parseObject(raw, ElasticColumn.class); + if (StringUtils.isNotBlank(column.getName()) && StringUtils.isNotBlank(column.getType())) { + outputColumn.add(column); + if (!column.getName().equals(DEFAULT_ID) && ElasticFieldDataType.valueOf(column.getType().toUpperCase()) + != ElasticFieldDataType.ALIAS) { + Map property = JSONObject.parseObject(raw, Map.class); + property.remove(ElasticKey.PROPS_COLUMN_NAME); + properties.put(column.getName(), property); + } + } + }); + }else{ + if(!client.existIndices(index)){ + throw DataXException.asDataXException(ElasticWriterErrorCode.INDEX_NOT_EXIST, + "cannot get columns from index:[" + index +"]"); + } + //get properties from index existed + properties = client.getProps(index, type); + resolveColumn(outputColumn, null, properties, columnNameSeparator); + //Reverse outputColumn + Collections.reverse(outputColumn); + } + return properties; + } + + private void resolveColumn(List outputColumn, ElasticColumn column, + Map propsMap, String columnNameSeparator){ + propsMap.forEach((key, value) ->{ + if(value instanceof Map){ + Map metaMap = (Map)value; + if(null != metaMap.get(ElasticKey.PROPS_COLUMN_TYPE)){ + ElasticColumn levelColumn = new ElasticColumn(); + if(null != column) { + levelColumn.setName(column.getName() + columnNameSeparator + key); + }else{ + levelColumn.setName(String.valueOf(key)); + } + levelColumn.setType(String.valueOf(metaMap.get(ElasticKey.PROPS_COLUMN_TYPE))); + if(null != metaMap.get(ElasticKey.PROPS_COLUMN_TIMEZONE)){ + levelColumn.setTimezone(String.valueOf(metaMap.get(ElasticKey.PROPS_COLUMN_TIMEZONE))); + } + if(null != metaMap.get(ElasticKey.PROPS_COLUMN_FORMAT)){ + levelColumn.setFormat(String.valueOf(metaMap.get(ElasticKey.PROPS_COLUMN_FORMAT))); + } + outputColumn.add(levelColumn); + }else if(null != metaMap.get(ElasticRestHighClient.FIELD_PROPS) + && metaMap.get(ElasticRestHighClient.FIELD_PROPS) instanceof Map){ + ElasticColumn levelColumn = column; + if(null == levelColumn){ + levelColumn = new ElasticColumn(); + levelColumn.setName(String.valueOf(key)); + }else{ + levelColumn.setName(levelColumn.getName() + columnNameSeparator + key); + } + resolveColumn(outputColumn, levelColumn, (Map)metaMap.get(ElasticRestHighClient.FIELD_PROPS), + columnNameSeparator); + } + } + }); } @Override @@ -81,6 +194,16 @@ private void validateParams(){ throw DataXException.asDataXException(ElasticWriterErrorCode.REQUIRE_VALUE, "'endPoints(elasticUrls)' is necessary"); } + this.userName = this.jobConf.getString(ElasticKey.USERNAME, ""); + this.password = this.jobConf.getString(ElasticKey.PASSWORD, ""); + if(StringUtils.isNotBlank(this.password)){ + try { + this.password = (String) CryptoUtils.string2Object(this.password); + } catch (Exception e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, "decrypt password failed"); + } + } + this.endPoints = endPoints.split(DEFAULT_ENDPOINT_SPLIT); this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE); } @@ -103,14 +226,33 @@ public static class Task extends Writer.Task{ private Long maxBatchSizeBytes = 5L * 1024L * 1024L; private int backendVersion; + private ElasticRestHighClient restHighClient; + private BulkProcessor bulkProcessor; + private volatile boolean bulkError; + private static final ObjectMapper mapper = new ObjectMapper(); @Override public void startWrite(BasicDataReceiver receiver, Class type) { - logger.info("Begin to BasicDataReceiver write record to ElasticSearch"); - super.startWrite(receiver, type); - logger.info("End to BasicDataReceiver write record to ElasticSearch"); - throw DataXException.asDataXException(ElasticWriterErrorCode.FEATURES_ERROR, "功能不支持"); + if(type.equals(DocWriteRequest.class)){ + logger.info("Begin to write record to ElasticSearch"); + long count = 0; + DocWriteRequest request = null; + while(null != (request = (DocWriteRequest) receiver.getFromReader())){ + request.index(indexName); + request.type(typeName); + if(bulkError){ + throw DataXException.asDataXException(ElasticWriterErrorCode.BULK_REQ_ERROR, ""); + } + this.bulkProcessor.add(request); + count += 1; + } + this.bulkProcessor.close(); + getTaskPluginCollector().collectMessage(ElasticWriter.Job.WRITE_SIZE, String.valueOf(count)); + logger.info("End to write record to ElasticSearch"); + }else{ + super.startWrite(receiver, type); + } } @Override public void startWrite(RecordReceiver lineReceiver) { @@ -189,6 +331,27 @@ public void init() { restClient = ElasticRestClient.createClient(endPoints); backendVersion = getBackendVersion(); + + //创建索引映射 + int bulkPerTask = this.taskConf.getInt(ElasticKey.BULK_PER_TASK, 1); + String userName = this.taskConf.getString(ElasticKey.USERNAME, ""); + String password = this.taskConf.getString(ElasticKey.PASSWORD, ""); + if(StringUtils.isNotBlank(password)){ + try { + password = (String) CryptoUtils.string2Object(password); + } catch (Exception e) { + throw DataXException.asDataXException(ElasticWriterErrorCode.CONFIG_ERROR, "decrypt password failed"); + } + } + + if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){ + restHighClient = ElasticRestHighClient.custom(endPoints, userName, + password, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG)); + }else{ + restHighClient = ElasticRestHighClient.custom(endPoints, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG)); + } + this.bulkProcessor = restHighClient.createBulk(buildListener(getTaskPluginCollector()), batchSize, bulkPerTask); + } @Override @@ -200,6 +363,9 @@ public void destroy() { throw DataXException.asDataXException(ElasticWriterErrorCode.CLOSE_EXCEPTION, e); } } + if(null != restHighClient){ + restHighClient.close(); + } } private void checkForErrors(HttpEntity responseEntity, int backendVersion) throws IOException { @@ -261,5 +427,38 @@ private static JsonNode parseResponse(HttpEntity responseEntity) throws IOExcep return mapper.readValue(responseEntity.getContent(), JsonNode.class); } + private BulkProcessor.Listener buildListener(final TaskPluginCollector pluginCollector){ + return new BulkProcessor.Listener() { + @Override + public void beforeBulk(long l, BulkRequest bulkRequest) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + logger.trace("do_bulk: " + bulkRequest.getDescription()); + } + + @Override + public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { + BulkItemResponse[] response = bulkResponse.getItems(); + for (BulkItemResponse itemResponse : response) { + if (itemResponse.isFailed()) { + List message = new ArrayList<>(); + message.add(String.valueOf(itemResponse.getFailure().getStatus().getStatus())); + message.add(itemResponse.getId()); + message.add(itemResponse.getFailureMessage()); + pluginCollector.collectDirtyRecord(new DirtyRecord(), null, JSON.toJSONString(message)); + } + } + } + + @Override + public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { + //Ignore interrupted error + if(!(throwable instanceof InterruptedException)){ + logger.error(throwable.getMessage(), throwable); + } + bulkError = true; + } + }; + } + } }