diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java deleted file mode 100644 index a21ab2a4f..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.base; - -public class EsConstants { - - public static final int DEFAULT_ES_CONNECTION_REQUEST_TIMEOUT = 10000; - public static final int DEFAULT_ES_CONNECTION_TIMEOUT = 10000; - public static final int DEFAULT_ES_SOCKET_TIMEOUT = 60000; - - public static final int ILLEGAL_REST_STATUS_CODE = -1; - - public static final String DEFAULT_BULK_FLUSH_MAX_ACTIONS = "300"; - public static final String DEFAULT_BULK_FLUSH_MAX_SIZE_MB = "10"; - public static final String DEFAULT_BULK_FLUSH_INTERVAL = "1000"; - - public static final String BACKOFF_POLICY_CONSTANT = "CONSTANT"; - public static final String BACKOFF_POLICY_EXPONENTIAL = "EXPONENTIAL"; - public static final String BACKOFF_POLICY_NONE = "NONE"; - - public static final String KEY_NULL_LITERAL = ""; - - public static final String FIELD_NAME_INDEX = "_index"; - public static final String FIELD_NAME_TYPE = "_type"; - public static final String FIELD_NAME_OP_TYPE = "_op_type"; - public static final String FIELD_NAME_ID = "_id"; - public static final String FIELD_NAME_ROUTING = "_routing"; - public static final String FIELD_NAME_VERSION = "_version"; - public static final String FIELD_NAME_SOURCE = "_source"; - public static final String FIELD_NAME_SCRIPT = "_script"; - - public static final long DEFAULT_VERSION = -1; - - public static final String OPERATION_TYPE_DELETE = "delete"; - public static final String OPERATION_TYPE_INDEX = "index"; - public static final String OPERATION_TYPE_CREATE = "create"; - public static final String OPERATION_TYPE_UPDATE = "update"; - public static final String OPERATION_TYPE_UPSERT = "upsert"; - public static final String DEFAULT_OPERATION_TYPE = OPERATION_TYPE_INDEX; - - public static final String ES_CONNECTOR_NAME = "elasticsearch"; -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtil.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtil.java deleted file mode 100644 index 6e43771f0..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtil.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.base; - -import com.bytedance.bitsail.common.util.Preconditions; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class NetUtil { - - private static final String IPV6_HTTP_PORT_FORMAT = "(\\[.*\\]):(\\d+)"; - - public static boolean isIpv4Address(String h) { - String[] tok = h.split(":"); - return tok.length == 2; - } - - public static String getIpv4Ip(String h) { - Preconditions.checkState(isIpv4Address(h)); - return h.split(":")[0]; - } - - public static int getIpv4Port(String h) { - Preconditions.checkState(isIpv4Address(h)); - return Integer.parseInt(h.split(":")[1]); - } - - public static boolean isIpv6Address(String h) { - Matcher httpMatcher = Pattern.compile(IPV6_HTTP_PORT_FORMAT).matcher(h); - return httpMatcher.find(); - } - - public static String getIpv6Ip(String h) { - Preconditions.checkState(isIpv6Address(h)); - Matcher httpMatcher = Pattern.compile(IPV6_HTTP_PORT_FORMAT).matcher(h); - return httpMatcher.find() ? httpMatcher.group(1) : null; - } - - public static int getIpv6Port(String h) { - Preconditions.checkState(isIpv6Address(h)); - Matcher httpMatcher = Pattern.compile(IPV6_HTTP_PORT_FORMAT).matcher(h); - return httpMatcher.find() ? Integer.parseInt(httpMatcher.group(2)) : 0; - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/constants/Elasticsearchs.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/constants/Elasticsearchs.java new file mode 100644 index 000000000..097685ade --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/constants/Elasticsearchs.java @@ -0,0 +1,22 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.constants; + +public class Elasticsearchs { + + public static final String COMMA = ","; +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructor.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructor.java deleted file mode 100644 index e5428773d..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructor.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.model.ColumnInfo; -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; - -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.serializer.SerializerFeature; -import org.apache.commons.lang3.StringUtils; - -import java.util.List; -import java.util.Map; -import java.util.Objects; - -public class EsDocConstructor { - - private final EsDocParameters esDocParameters; - - public EsDocConstructor(EsDocParameters esDocParameters) { - this.esDocParameters = esDocParameters; - } - - public String form(Row row) { - List columns = esDocParameters.getColumns(); - boolean ignoreBlankValue = esDocParameters.isIgnoreBlankValue(); - List excludeFieldsIndices = esDocParameters.getExcludedFieldsIndices(); - Integer dynamicFieldIndex = esDocParameters.getDynamicFieldIndex(); - boolean flattenMap = esDocParameters.isFlattenMap(); - List jsonFeatures = esDocParameters.getJsonFeatures(); - - JSONObject jsonObject = new JSONObject(); - String columnName; - String columnType; - Object columnValue; - - Object[] fields = row.getFields(); - for (int i = 0; i < fields.length; i++) { - columnName = columns.get(i).getName(); - columnType = columns.get(i).getType(); - columnValue = fields[i]; - - if (ignoreBlankValue && isBlankValue(columnValue)) { - continue; - } - - if (!excludeFieldsIndices.isEmpty() && excludeFieldsIndices.contains(i)) { - continue; - } - - if (dynamicFieldIndex != null && dynamicFieldIndex == i) { - continue; - } - - if (flattenMap && Objects.nonNull(columnValue) && columnType.trim().toUpperCase().startsWith("MAP")) { - ((Map) columnValue).forEach((k, v) -> { - if (ignoreBlankValue && isBlankValue(v)) { - return; - } - jsonObject.put(k.toString(), v); - }); - continue; - } - - jsonObject.put(columnName, columnValue); - } - - String doc = JSONObject.toJSONString(jsonObject, jsonFeatures.toArray(new SerializerFeature[0])); - - return StringUtils.isEmpty(doc) ? null : doc; - } - - private boolean isBlankValue(Object columnValue) { - if (Objects.nonNull(columnValue) && columnValue instanceof Number) { - return ((Number) columnValue).intValue() == 0; - } - return false; - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructor.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructor.java deleted file mode 100644 index 7311de237..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructor.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.BitSailException; -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.exception.CommonErrorCode; -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; -import com.bytedance.bitsail.connector.elasticsearch.doc.tools.EsKeySelector; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; - -import org.apache.commons.lang3.StringUtils; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; - -import java.io.Serializable; -import java.util.List; -import java.util.Objects; - -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.DEFAULT_VERSION; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.OPERATION_TYPE_CREATE; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.OPERATION_TYPE_DELETE; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.OPERATION_TYPE_INDEX; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.OPERATION_TYPE_UPDATE; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.OPERATION_TYPE_UPSERT; - -public class EsRequestConstructor implements Serializable { - - private final String esIndex; - private final String defaultOperationType; - - private final Integer dynamicFieldIndex; - private final Integer opTypeIndex; - private final Integer versionIndex; - - private final EsKeySelector docIdSelector; - private final EsKeySelector routingSelector; - private final EsDocConstructor esDocConstructor; - - public EsRequestConstructor(BitSailConfiguration jobConf, - EsDocParameters esDocParameters) { - this.esIndex = jobConf.get(ElasticsearchWriterOptions.ES_INDEX); - this.defaultOperationType = jobConf.get(ElasticsearchWriterOptions.ES_OPERATION_TYPE); - - this.dynamicFieldIndex = esDocParameters.getDynamicFieldIndex(); - this.opTypeIndex = esDocParameters.getOpTypeIndex(); - this.versionIndex = esDocParameters.getVersionIndex(); - - final List docIdFieldsIndices = esDocParameters.getIdFieldsIndices(); - final List routingFieldsIndices = esDocParameters.getRoutingFieldsIndices(); - this.docIdSelector = new EsKeySelector(docIdFieldsIndices, esDocParameters.getIdDelimiter()); - this.routingSelector = new EsKeySelector(routingFieldsIndices, esDocParameters.getIdDelimiter()); - this.esDocConstructor = new EsDocConstructor(esDocParameters); - } - - private String getIndex(Row row) { - return Objects.isNull(dynamicFieldIndex) ? esIndex : String.valueOf(row.getField(dynamicFieldIndex)); - } - - private String getOpType(Row row) { - return Objects.isNull(opTypeIndex) ? defaultOperationType : String.valueOf(row.getField(opTypeIndex)); - } - - private long getVersion(Row row) { - return Objects.isNull(versionIndex) ? DEFAULT_VERSION : Long.parseLong(row.getField(versionIndex).toString()); - } - - public ActionRequest createRequest(Row row) { - String index = getIndex(row); - String opType = getOpType(row); - String id = docIdSelector.getKey(row); - String source = esDocConstructor.form(row); - String routing = routingSelector.getKey(row); - long version = getVersion(row); - - return createRequest(index, opType, id, source, routing, version); - } - - private ActionRequest createRequest(String index, String opType, String id, String source, String routing, long version) { - ActionRequestValidationException validationException; - switch (opType.toLowerCase().trim()) { - case OPERATION_TYPE_DELETE: - DeleteRequest deleteRequest = new DeleteRequest().index(index).id(id); - if (version > 0) { - deleteRequest.version(version).versionType(VersionType.EXTERNAL_GTE); - } - if (!Strings.isEmpty(routing)) { - deleteRequest.routing(routing); - } - - validationException = deleteRequest.validate(); - if (validationException != null) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct delete request failed! request is: %s, reason is: %s", - deleteRequest, validationException)); - } - return deleteRequest; - case OPERATION_TYPE_INDEX: - IndexRequest indexRequest = new IndexRequest().index(index).id(id); - if (StringUtils.isEmpty(source)) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct index request failed, because _doc is empty!, request is : %s", indexRequest)); - } - indexRequest.source(source, XContentType.JSON); - if (version > 0) { - indexRequest.version(version).versionType(VersionType.EXTERNAL_GTE); - } - if (!Strings.isEmpty(routing)) { - indexRequest.routing(routing); - } - validationException = indexRequest.validate(); - if (validationException != null) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct index request failed! request is: %s, reason is: %s", - indexRequest, validationException)); - } - return indexRequest; - case OPERATION_TYPE_CREATE: - IndexRequest createRequest = new IndexRequest().index(index).id(id); - createRequest.create(true); - if (StringUtils.isEmpty(source)) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct create request failed, because _doc is empty!, request is : %s", createRequest)); - } - createRequest.source(source, XContentType.JSON); - if (version > 0) { - createRequest.version(version).versionType(VersionType.EXTERNAL_GTE); - } - if (!Strings.isEmpty(routing)) { - createRequest.routing(routing); - } - validationException = createRequest.validate(); - if (validationException != null) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct create request failed! request is: %s, reason is: %s", - createRequest, validationException)); - } - return createRequest; - case OPERATION_TYPE_UPDATE: - UpdateRequest updateRequest = new UpdateRequest().index(index).id(id); - if (source != null) { - updateRequest.doc(source, XContentType.JSON); - } - if (!Strings.isEmpty(routing)) { - updateRequest.routing(routing); - } - validationException = updateRequest.validate(); - if (validationException != null) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct update request failed! request is: %s, reason is: %s", - updateRequest, validationException)); - } - return updateRequest; - case OPERATION_TYPE_UPSERT: - UpdateRequest upsertRequest = new UpdateRequest().index(index).id(id); - if (source != null) { - upsertRequest.doc(source, XContentType.JSON); - upsertRequest.docAsUpsert(true); - } - if (!Strings.isEmpty(routing)) { - upsertRequest.routing(routing); - } - validationException = upsertRequest.validate(); - if (upsertRequest.validate() != null) { - throw new BitSailException(CommonErrorCode.RUNTIME_ERROR, String.format( - "oops! construct upsert request failed! request is: %s, reason is: %s", - upsertRequest, validationException)); - } - return upsertRequest; - default: - throw new RuntimeException("addCustomRequestToIndexer can not reach it"); - } - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/parameter/EsDocParameters.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/parameter/EsDocParameters.java deleted file mode 100644 index 95c11a9d3..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/parameter/EsDocParameters.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc.parameter; - -import com.bytedance.bitsail.common.model.ColumnInfo; - -import com.alibaba.fastjson.serializer.SerializerFeature; -import lombok.Builder; -import lombok.Data; - -import java.io.Serializable; -import java.util.List; - -@Data -@Builder -public class EsDocParameters implements Serializable { - /** - * The indices of record fields that are used to producing document '_id'. - */ - private List idFieldsIndices; - - /** - * The indices of record fields that will not be inserted into Elasticsearch document. - */ - private List excludedFieldsIndices; - - /** - * The indices of record fields that are used for routing. - */ - private List routingFieldsIndices; - - /** - * Delimiter used for construct composite document '_id' and routing id. - */ - private String idDelimiter; - - /** - * The index of dynamic field in records. - * We use dynamic field to determine what Elasticsearch index to insert for a record. - * Notice that, dynamic field will not be included in document. - */ - private Integer dynamicFieldIndex; - - /** - * The index of operation type. - */ - private Integer opTypeIndex; - - /** - * The index of version. - */ - private Integer versionIndex; - - /** - * Option for determining if empty fields in record should be included in document. - */ - private boolean ignoreBlankValue; - - /** - * Option for determining if map entries should be expanded in document. - */ - private boolean flattenMap; - - /** - * User defined json features for creating json format document. - */ - private List jsonFeatures; - - /** - * Used to format json document. - */ - private List columns; -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/tools/EsKeySelector.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/tools/EsKeySelector.java deleted file mode 100644 index 9ce9b435d..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/doc/tools/EsKeySelector.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc.tools; - -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.base.EsConstants; - -import lombok.AllArgsConstructor; - -import java.io.Serializable; -import java.util.List; - -@AllArgsConstructor -public class EsKeySelector implements Serializable { - - private final List fieldsIndices; - private final String delimiter; - - public String getKey(Row row) { - final StringBuilder builder = new StringBuilder(); - for (int i = 0; i < fieldsIndices.size(); i++) { - if (i > 0) { - builder.append(delimiter); - } - final Object value = row.getField(fieldsIndices.get(i)); - if (value == null) { - builder.append(EsConstants.KEY_NULL_LITERAL); - } else { - builder.append(value); - } - } - return builder.toString(); - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/exception/ElasticsearchErrorCode.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/exception/ElasticsearchErrorCode.java new file mode 100644 index 000000000..fe2895ea3 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/exception/ElasticsearchErrorCode.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.exception; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum ElasticsearchErrorCode implements ErrorCode { + + ELASTICSEARCH_WRITER_ERROR("Elastic-00", "Elasticsearch write failed with unexpected reason."), + ELASTICSEARCH_FIELD_NOT_EXIST_ERROR("Elastic-01", "Elasticsearch configuration not correct."); + + private final String code; + + private final String describe; + + ElasticsearchErrorCode(String code, String describe) { + this.code = code; + this.describe = describe; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return describe; + } + +} \ No newline at end of file diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/DefaultRowSerializationSchema.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/DefaultRowSerializationSchema.java new file mode 100644 index 000000000..10339abe2 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/DefaultRowSerializationSchema.java @@ -0,0 +1,125 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.format; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.exception.CommonErrorCode; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.row.RowKind; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfoValueConverter; +import com.bytedance.bitsail.connector.elasticsearch.constants.Elasticsearchs; +import com.bytedance.bitsail.connector.elasticsearch.format.extractor.DefaultFieldExtractor; +import com.bytedance.bitsail.connector.elasticsearch.format.extractor.DefaultValueExtractor; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.xcontent.XContentType; + +import java.util.function.Function; + +public class DefaultRowSerializationSchema implements ElasticsearchRowSerializationSchema { + + private final Function indexNameFunction; + private final Function primaryKeyFunction; + private final Function routingKeyFunction; + private final Function documentFunction; + + public DefaultRowSerializationSchema(BitSailConfiguration commonConfiguration, + BitSailConfiguration writerConfiguration, + RowTypeInfo rowTypeInfo) { + TypeInfoValueConverter valueConverter = new TypeInfoValueConverter(commonConfiguration); + this.indexNameFunction = getIndexNameFunction(valueConverter, writerConfiguration, rowTypeInfo); + this.primaryKeyFunction = DefaultFieldExtractor + .createFieldsExtractor(valueConverter, rowTypeInfo, + writerConfiguration.get(ElasticsearchOptions.ID_FIELD_DELIMITER), + Lists.newArrayList(StringUtils.split(writerConfiguration + .get(ElasticsearchOptions.ID_KEY_FIELDS), Elasticsearchs.COMMA) + ) + ); + this.routingKeyFunction = DefaultFieldExtractor.createFieldExtractor(valueConverter, rowTypeInfo, + writerConfiguration.get(ElasticsearchOptions.ROUTING_KEY_FIELD)); + this.documentFunction = new DefaultValueExtractor(valueConverter, rowTypeInfo); + } + + protected static Function getIndexNameFunction(TypeInfoValueConverter valueConverter, + BitSailConfiguration writerConfiguration, + RowTypeInfo rowTypeInfo) { + String indexName = writerConfiguration.get(ElasticsearchOptions.INDEX); + if (StringUtils.isNotEmpty(indexName)) { + return (row -> indexName); + } + + String indexKeyField = writerConfiguration.get(ElasticsearchOptions.INDEX_KEY_FIELD); + if (StringUtils.isNotEmpty(indexKeyField)) { + return DefaultFieldExtractor.createFieldExtractor( + valueConverter, + rowTypeInfo, + indexKeyField); + } + + throw BitSailException.asBitSailException(CommonErrorCode.CONFIG_ERROR, + String.format("Index name can't be confirm from configuration, please check the config name %s or %s.", + ElasticsearchOptions.INDEX.key(), ElasticsearchOptions.INDEX_KEY_FIELD.key())); + } + + @Override + public DocWriteRequest serialize(Row row) { + String indexName = indexNameFunction.apply(row); + String id = primaryKeyFunction.apply(row); + String routing = routingKeyFunction.apply(row); + String document = null; + RowKind kind = row.getKind(); + switch (kind) { + case INSERT: + //id could be null + document = documentFunction.apply(row); + return new IndexRequest() + .index(indexName) + .id(id) + .routing(routing) + .source(document, XContentType.JSON); + case UPDATE_AFTER: + //id can't be null + document = documentFunction.apply(row); + return new UpdateRequest() + .index(indexName) + .id(id) + .routing(routing) + .doc(document, XContentType.JSON) + .docAsUpsert(true); + + case UPDATE_BEFORE: + case DELETE: + //id can't be null + return new DeleteRequest() + .index(indexName) + .id(id) + .routing(routing); + default: + throw BitSailException.asBitSailException( + CommonErrorCode.INTERNAL_ERROR, + "Unsupported row kind: " + kind); + } + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/ElasticsearchRowSerializationSchema.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/ElasticsearchRowSerializationSchema.java new file mode 100644 index 000000000..330d92b50 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/ElasticsearchRowSerializationSchema.java @@ -0,0 +1,29 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.format; + +import com.bytedance.bitsail.common.row.Row; + +import org.elasticsearch.action.DocWriteRequest; + +import java.io.Serializable; + +public interface ElasticsearchRowSerializationSchema extends Serializable { + + DocWriteRequest serialize(Row row); + +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultFieldExtractor.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultFieldExtractor.java new file mode 100644 index 000000000..e9d4ffb0a --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultFieldExtractor.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.format.extractor; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfoValueConverter; +import com.bytedance.bitsail.common.typeinfo.TypeInfos; +import com.bytedance.bitsail.connector.elasticsearch.exception.ElasticsearchErrorCode; + +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +public class DefaultFieldExtractor implements Function, Serializable { + + private final String delimiter; + + private final List fieldIndexes; + + private final RowTypeInfo rowTypeInfo; + + private final TypeInfoValueConverter valueConverter; + + private DefaultFieldExtractor(TypeInfoValueConverter valueConverter, + RowTypeInfo rowTypeInfo, + String delimiter, + List fields) { + this.delimiter = delimiter; + this.rowTypeInfo = rowTypeInfo; + this.valueConverter = valueConverter; + this.fieldIndexes = new ArrayList<>(fields.size()); + for (String field : fields) { + int i = rowTypeInfo.indexOf(field); + if (i == -1) { + throw BitSailException.asBitSailException(ElasticsearchErrorCode.ELASTICSEARCH_FIELD_NOT_EXIST_ERROR, + String.format("Elasticsearch configure field %s not exists in row type info.", field)); + } + fieldIndexes.add(i); + } + } + + public static Function createFieldsExtractor(TypeInfoValueConverter valueConverter, + RowTypeInfo rowTypeInfo, + String delimiter, + List fields) { + if (CollectionUtils.isEmpty(fields)) { + return row -> null; + } + return new DefaultFieldExtractor(valueConverter, + rowTypeInfo, + delimiter, + fields); + } + + public static Function createFieldExtractor(TypeInfoValueConverter valueConverter, + RowTypeInfo rowTypeInfo, + String field) { + if (StringUtils.isEmpty(field)) { + return row -> null; + } + return new DefaultFieldExtractor(valueConverter, + rowTypeInfo, + StringUtils.EMPTY, + Lists.newArrayList(field)); + } + + @Override + public String apply(Row row) { + List values = Lists.newArrayListWithCapacity(fieldIndexes.size()); + for (int index = 0; index < fieldIndexes.size(); index++) { + Integer fieldIndex = fieldIndexes.get(index); + Object value = row.getField(fieldIndex); + values.add((String) valueConverter.convertObject(value, TypeInfos.STRING_TYPE_INFO)); + } + + return StringUtils.join(values, delimiter); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultValueExtractor.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultValueExtractor.java new file mode 100644 index 000000000..6036a02e2 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/format/extractor/DefaultValueExtractor.java @@ -0,0 +1,117 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.format.extractor; + +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfoValueConverter; +import com.bytedance.bitsail.common.typeinfo.TypeInfos; +import com.bytedance.bitsail.common.util.JsonSerializer; + +import com.google.common.collect.Maps; + +import java.io.Serializable; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +public class DefaultValueExtractor implements Function, Serializable { + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter + .ofPattern("yyyy-MM-dd"); + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter + .ofPattern("yyyy-MM-dd HH:mm:ss"); + + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter + .ofPattern("HH:mm:ss"); + + private final RowTypeInfo rowTypeInfo; + + //TODO use value converter to convert value to string. + private final TypeInfoValueConverter valueConverter; + + public DefaultValueExtractor(TypeInfoValueConverter valueConverter, + RowTypeInfo rowTypeInfo) { + this.valueConverter = valueConverter; + this.rowTypeInfo = rowTypeInfo; + } + + @Override + public String apply(Row row) { + Map document = Maps.newHashMapWithExpectedSize(row.getArity()); + for (int index = 0; index < row.getArity(); index++) { + String name = rowTypeInfo.getFieldNames()[index]; + Object value = convert(row.getField(index), rowTypeInfo.getTypeInfos()[index]); + document.put(name, value); + } + return JsonSerializer.serialize(document); + } + + public Object convert(Object value, TypeInfo typeInfo) { + if (Objects.isNull(value)) { + return null; + } + Class typeClass = typeInfo.getTypeClass(); + if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == typeClass) { + return null; + } + + if (TypeInfos.LOCAL_DATE_TYPE_INFO.getTypeClass() == typeClass) { + return ((LocalDate) value) + .format(DATE_FORMATTER); + } + + if (TypeInfos.LOCAL_DATE_TIME_TYPE_INFO.getTypeClass() == typeClass) { + return ((LocalDateTime) value) + .format(DATE_TIME_FORMATTER); + } + + if (TypeInfos.LOCAL_TIME_TYPE_INFO.getTypeClass() == typeClass) { + return ((LocalTime) value) + .format(TIME_FORMATTER); + } + + if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == typeClass) { + LocalDateTime localDateTime = ((Timestamp) value).toLocalDateTime(); + return localDateTime.format(DATE_TIME_FORMATTER); + } + + if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == typeClass) { + Date date = (java.sql.Date) value; + return date.toLocalDate() + .format(DATE_FORMATTER); + } + + if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == typeClass) { + Time time = (java.sql.Time) value; + return time.toLocalTime() + .format(TIME_FORMATTER); + } + + return value; + } + +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchOptions.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchOptions.java new file mode 100644 index 000000000..7b756d618 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchOptions.java @@ -0,0 +1,109 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.option; + +import com.bytedance.bitsail.common.option.ConfigOption; + +import java.io.Serializable; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; + +public interface ElasticsearchOptions extends Serializable { + + ConfigOption HOSTS = + key("hosts") + .noDefaultValue(String.class) + .withAlias("es_hosts"); + + ConfigOption USERNAME = + key("username") + .noDefaultValue(String.class) + .withAlias("es_username"); + + ConfigOption PASSWORD = + key("password") + .noDefaultValue(String.class) + .withAlias("es_password"); + + ConfigOption PATH_PREFIX = + key("path_prefix") + .noDefaultValue(String.class); + + ConfigOption INDEX = + key("index") + .noDefaultValue(String.class) + .withAlias("es_index"); + + ConfigOption ID_FIELD_DELIMITER = + key("id_delimiter") + .defaultValue("#"); + + ConfigOption ID_KEY_FIELDS = + key("id_key_fields") + .noDefaultValue(String.class) + .withAlias("es_id_fields"); + + ConfigOption ROUTING_KEY_FIELD = + key("routing_key") + .noDefaultValue(String.class) + .withAlias("es_shard_routing_fields"); + + ConfigOption INDEX_KEY_FIELD = + key("index_key_field") + .noDefaultValue(String.class) + .withAlias("es_dynamic_index_field"); + + + ConfigOption CONNECTION_REQUEST_TIMEOUT_MS = + key("connection_request_timeout_ms") + .defaultValue(10000); + + ConfigOption CONNECTION_TIMEOUT_MS = + key("connection_timeout_ms") + .defaultValue(10000); + + ConfigOption SOCKET_TIMEOUT_MS = + key("socket_timeout_ms") + .defaultValue(60000); + + @SuppressWarnings("checkstyle:MagicNumber") + ConfigOption BULK_MAX_BATCH_COUNT = + key("bulk_max_batch_count") + .defaultValue(300) + .withAlias("bulk_flush_max_actions"); + + @SuppressWarnings("checkstyle:MagicNumber") + ConfigOption BULK_MAX_BATCH_SIZE = + key("bulk_max_batch_size") + .defaultValue(10) + .withAlias("bulk_flush_max_size_mb"); + + @SuppressWarnings("checkstyle:MagicNumber") + ConfigOption BULK_MAX_RETRY_COUNT = + key("bulk_max_retry_count") + .defaultValue(5) + .withAlias("bulk_backoff_max_retry_count"); + + ConfigOption BULK_FLUSH_INTERVAL_MS = + key("bulk_flush_interval_ms") + .defaultValue(60 * 1000L); + + ConfigOption BULK_BACKOFF_DELAY_MS = + key("bulk_backoff_delay_ms") + .defaultValue(100); +} + diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchWriterOptions.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchWriterOptions.java deleted file mode 100644 index 905cfe1bc..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchWriterOptions.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.option; - -import com.bytedance.bitsail.common.option.ConfigOption; -import com.bytedance.bitsail.common.option.WriterOptions; - -import com.alibaba.fastjson.TypeReference; - -import java.util.List; - -import static com.bytedance.bitsail.common.option.ConfigOptions.key; - -public interface ElasticsearchWriterOptions extends WriterOptions.BaseWriterOptions { - ConfigOption> ES_HOSTS = - key(WriterOptions.WRITER_PREFIX + "es_hosts") - .onlyReference(new TypeReference>() { - }); - - ConfigOption REQUEST_PATH_PREFIX = - key(WriterOptions.WRITER_PREFIX + "request_path_prefix") - .noDefaultValue(String.class); - - ConfigOption CONNECTION_REQUEST_TIMEOUT_MS = - key(WriterOptions.WRITER_PREFIX + "connection_request_timeout_ms") - .defaultValue(10000); - - ConfigOption CONNECTION_TIMEOUT_MS = - key(WriterOptions.WRITER_PREFIX + "connection_timeout_ms") - .defaultValue(10000); - - ConfigOption SOCKET_TIMEOUT_MS = - key(WriterOptions.WRITER_PREFIX + "socket_timeout_ms") - .defaultValue(60000); - - ConfigOption MAX_IGNORE_FAILED_REQUEST_THRESHOLD = - key(WriterOptions.WRITER_PREFIX + "max_ignore_failed_request_threshold") - .defaultValue(0); - - ConfigOption BULK_FLUSH_MAX_ACTIONS = - key(WriterOptions.WRITER_PREFIX + "bulk_flush_max_actions") - .defaultValue(300); - - ConfigOption BULK_FLUSH_MAX_SIZE_MB = - key(WriterOptions.WRITER_PREFIX + "bulk_flush_max_size_mb") - .defaultValue(10); - - ConfigOption BULK_FLUSH_INTERVAL_MS = - key(WriterOptions.WRITER_PREFIX + "bulk_flush_interval_ms") - .defaultValue(10000L); - - ConfigOption BULK_BACKOFF_POLICY = - key(WriterOptions.WRITER_PREFIX + "bulk_backoff_policy") - .defaultValue("EXPONENTIAL"); - - ConfigOption BULK_BACKOFF_DELAY_MS = - key(WriterOptions.WRITER_PREFIX + "bulk_backoff_delay_ms") - .defaultValue(100); - - ConfigOption BULK_BACKOFF_MAX_RETRY_COUNT = - key(WriterOptions.WRITER_PREFIX + "bulk_backoff_max_retry_count") - .defaultValue(5); - - ConfigOption ES_INDEX = - key(WriterOptions.WRITER_PREFIX + "es_index") - .noDefaultValue(String.class); - - ConfigOption ES_OPERATION_TYPE = - key(WriterOptions.WRITER_PREFIX + "es_operation_type") - .defaultValue("index"); - - ConfigOption ES_DYNAMIC_INDEX_FIELD = - key(WriterOptions.WRITER_PREFIX + "es_dynamic_index_field") - .noDefaultValue(String.class); - - ConfigOption ES_OPERATION_TYPE_FIELD = - key(WriterOptions.WRITER_PREFIX + "es_operation_type_field") - .noDefaultValue(String.class); - - ConfigOption ES_VERSION_FIELD = - key(WriterOptions.WRITER_PREFIX + "es_version_field") - .noDefaultValue(String.class); - - /** - * How to build elasticsearch from the columns. - */ - ConfigOption ES_ID_FIELDS = - key(WriterOptions.WRITER_PREFIX + "es_id_fields") - .defaultValue(""); - - ConfigOption IGNORE_BLANK_VALUE = - key(WriterOptions.WRITER_PREFIX + "ignore_blank_value") - .defaultValue(false); - - ConfigOption FLATTEN_MAP = - key(WriterOptions.WRITER_PREFIX + "flatten_map") - .defaultValue(false); - - ConfigOption SHARD_ROUTING_FIELDS = - key(WriterOptions.WRITER_PREFIX + "es_shard_routing_fields") - .defaultValue(""); - - ConfigOption DOC_EXCLUDE_FIELDS = - key(WriterOptions.WRITER_PREFIX + "doc_exclude_fields") - .defaultValue(""); - - ConfigOption DOC_ID_DELIMITER = - key(WriterOptions.WRITER_PREFIX + "id_delimiter") - .defaultValue("#"); - - - /** - * Custom fastjson serialization method, multiple values are separated by commas - * - * @see com.alibaba.fastjson.serializer.SerializerFeature - */ - ConfigOption JSON_SERIALIZER_FEATURES = - key(WriterOptions.WRITER_PREFIX + "json_serializer_features") - .noDefaultValue(String.class); -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRequestEmitter.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRequestEmitter.java deleted file mode 100644 index 7ac99f0a8..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRequestEmitter.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.rest; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.exception.CommonErrorCode; -import com.bytedance.bitsail.common.model.ColumnInfo; -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.common.util.FastJsonUtil; -import com.bytedance.bitsail.connector.elasticsearch.doc.EsRequestConstructor; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; - -import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import org.apache.commons.lang3.StringUtils; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class EsRequestEmitter { - - private final EsRequestConstructor constructor; - - public EsRequestEmitter(BitSailConfiguration jobConf) { - EsDocParameters parameters = initEsDocParams(jobConf); - this.constructor = new EsRequestConstructor(jobConf, parameters); - } - - @VisibleForTesting - public static EsDocParameters initEsDocParams(BitSailConfiguration jobConf) { - String esIdFields = jobConf.get(ElasticsearchWriterOptions.ES_ID_FIELDS); - String idDelimiter = jobConf.get(ElasticsearchWriterOptions.DOC_ID_DELIMITER); - String docExcludeFields = jobConf.get(ElasticsearchWriterOptions.DOC_EXCLUDE_FIELDS); - String routingFields = jobConf.get(ElasticsearchWriterOptions.SHARD_ROUTING_FIELDS); - - boolean ignoreBlankValue = jobConf.get(ElasticsearchWriterOptions.IGNORE_BLANK_VALUE); - boolean flattenMap = jobConf.get(ElasticsearchWriterOptions.FLATTEN_MAP); - List jsonFeatures = FastJsonUtil.parseSerializerFeaturesFromConfig( - jobConf.get(ElasticsearchWriterOptions.JSON_SERIALIZER_FEATURES)); - - List columns = jobConf.getNecessaryOption(ElasticsearchWriterOptions.COLUMNS, CommonErrorCode.LACK_NECESSARY_FIELDS); - - List idFieldsIndices = new ArrayList<>(); - List excludeFieldsIndices = new ArrayList<>(); - List routingFieldsIndices = new ArrayList<>(); - if (StringUtils.isNotEmpty(esIdFields)) { - List fieldsNames = Arrays.asList(esIdFields.split(",\\s*")); - idFieldsIndices = getFieldsIndices(columns, fieldsNames); - } - - if (StringUtils.isNotEmpty(docExcludeFields)) { - List fieldsNames = Arrays.asList(docExcludeFields.split(",\\s*")); - excludeFieldsIndices = getFieldsIndices(columns, fieldsNames); - } - - if (StringUtils.isNotEmpty(routingFields)) { - List fieldsNames = Arrays.asList(routingFields.split(",\\s*")); - routingFieldsIndices = getFieldsIndices(columns, fieldsNames); - } - - Integer dynamicFieldIndex = jobConf.fieldExists(ElasticsearchWriterOptions.ES_DYNAMIC_INDEX_FIELD) ? - getFieldsIndices(columns, - ImmutableList.of(jobConf.get(ElasticsearchWriterOptions.ES_DYNAMIC_INDEX_FIELD))).get(0) - : null; - - Integer versionIndex = jobConf.fieldExists(ElasticsearchWriterOptions.ES_VERSION_FIELD) ? - getFieldsIndices(columns, ImmutableList.of(jobConf.get(ElasticsearchWriterOptions.ES_VERSION_FIELD))).get(0) - : null; - - Integer opTypeIndex = jobConf.fieldExists(ElasticsearchWriterOptions.ES_OPERATION_TYPE_FIELD) ? - getFieldsIndices(columns, ImmutableList.of(jobConf.get(ElasticsearchWriterOptions.ES_OPERATION_TYPE_FIELD))).get(0) - : null; - - return EsDocParameters.builder() - .idFieldsIndices(idFieldsIndices) - .excludedFieldsIndices(excludeFieldsIndices) - .routingFieldsIndices(routingFieldsIndices) - .dynamicFieldIndex(dynamicFieldIndex) - .opTypeIndex(opTypeIndex) - .versionIndex(versionIndex) - .ignoreBlankValue(ignoreBlankValue) - .flattenMap(flattenMap) - .columns(columns) - .jsonFeatures(jsonFeatures) - .idDelimiter(idDelimiter) - .build(); - } - - private static List getFieldsIndices(List columns, List fieldNames) { - Map columnIndexMap = new HashMap<>(); - for (int i = 0; i < columns.size(); i++) { - String name = columns.get(i).getName(); - columnIndexMap.put(name, i); - } - - return fieldNames.stream() - .map(fieldName -> { - if (columnIndexMap.containsKey(fieldName)) { - return columnIndexMap.get(fieldName); - } else { - throw new IllegalArgumentException("fieldName: " + fieldName + " is not in columns"); - } - }).collect(Collectors.toList()); - } - - public void emit(Row row, BulkProcessor bulkProcessor) { - ActionRequest request = constructor.createRequest(row); - emit(request, bulkProcessor); - } - - public void emit(ActionRequest request, BulkProcessor bulkProcessor) { - if (request instanceof IndexRequest) { - bulkProcessor.add((IndexRequest) request); - } else if (request instanceof DeleteRequest) { - bulkProcessor.add((DeleteRequest) request); - } else if (request instanceof UpdateRequest) { - bulkProcessor.add((UpdateRequest) request); - } else { - throw new RuntimeException("Unsupported request type: " + request.getClass()); - } - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java deleted file mode 100644 index f5721fd20..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.rest; - -import com.bytedance.bitsail.common.BitSailException; -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.exception.CommonErrorCode; -import com.bytedance.bitsail.common.util.Preconditions; -import com.bytedance.bitsail.connector.elasticsearch.base.NetUtil; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; - -import org.apache.commons.lang.StringUtils; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -public class EsRestClientBuilder { - private static final Logger LOG = LoggerFactory.getLogger(EsRestClientBuilder.class); - - private final RestClientBuilder builder; - - public EsRestClientBuilder(BitSailConfiguration jobConf) { - List hostAddressList = jobConf.get(ElasticsearchWriterOptions.ES_HOSTS); - List hosts = parseHostsAddress(hostAddressList); - Preconditions.checkState(!hosts.isEmpty(), "cannot find any valid host from configurations."); - LOG.info("Elasticsearch http client hosts: {}", hosts); - - builder = RestClient.builder(hosts.toArray(new HttpHost[0])); - configureBuilder(jobConf); - } - - /** - * Transform hosts strings into a list of {@link HttpHost} hosts. - * - * @param hosts A list of host strings. - * @return A list of hosts. - */ - private static List parseHostsAddress(List hosts) { - if (Objects.isNull(hosts) || hosts.isEmpty()) { - return Collections.emptyList(); - } - return hosts.stream().map(host -> { - if (NetUtil.isIpv6Address(host)) { - return new HttpHost(Preconditions.checkNotNull(NetUtil.getIpv6Ip(host)), NetUtil.getIpv6Port(host)); - } else if (NetUtil.isIpv4Address(host)) { - return new HttpHost(Preconditions.checkNotNull(NetUtil.getIpv4Ip(host)), NetUtil.getIpv4Port(host)); - } - throw new BitSailException(CommonErrorCode.CONFIG_ERROR, "invalid elasticsearch host: " + host); - }).collect(Collectors.toList()); - } - - public RestHighLevelClient build() { - return new RestHighLevelClient(builder); - } - - private void configureBuilder(BitSailConfiguration jobConf) { - String userName = jobConf.get(ElasticsearchWriterOptions.USER_NAME); - String password = jobConf.get(ElasticsearchWriterOptions.PASSWORD); - String requestPathPrefix = jobConf.get(ElasticsearchWriterOptions.REQUEST_PATH_PREFIX); - int connectionRequestTimeout = jobConf.get(ElasticsearchWriterOptions.CONNECTION_REQUEST_TIMEOUT_MS); - int connectionTimeout = jobConf.get(ElasticsearchWriterOptions.CONNECTION_TIMEOUT_MS); - int socketTimeout = jobConf.get(ElasticsearchWriterOptions.SOCKET_TIMEOUT_MS); - - if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(password)) { - final CredentialsProvider provider = new BasicCredentialsProvider(); - provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); - builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> - httpAsyncClientBuilder.setDefaultCredentialsProvider(provider)); - } - - if (StringUtils.isNotEmpty(requestPathPrefix)) { - builder.setPathPrefix(requestPathPrefix); - } - - builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder - .setConnectionRequestTimeout(connectionRequestTimeout) - .setConnectTimeout(connectionTimeout) - .setSocketTimeout(socketTimeout)); - } - -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkListener.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkListener.java deleted file mode 100644 index 1326342ac..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkListener.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.rest.bulk; - -import lombok.AllArgsConstructor; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.rest.RestStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.ILLEGAL_REST_STATUS_CODE; - -@AllArgsConstructor -public class EsBulkListener implements BulkProcessor.Listener { - private static final Logger LOGGER = LoggerFactory.getLogger(EsBulkListener.class); - - private static final int FAILURE_RESPONSE_IDENTIFIER = 0; - private static final int SUCCESS_RESPONSE_IDENTIFIER = 1; - - private final EsBulkRequestFailureHandler failureHandler; - private final AtomicReference failureThrowable; - private final AtomicInteger pendingActions; - - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOGGER.debug("before bulk: id={}, request={}", executionId, request); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - int successCount = 0; - try { - for (int i = 0; i < response.getItems().length; ++i) { - successCount += handleBulkItemResponse(response.getItems()[i], request.requests().get(i)); - } - } catch (Throwable t) { - failureThrowable.compareAndSet(null, t); - } - pendingActions.getAndAdd(-successCount); - } else { - pendingActions.getAndAdd(-request.numberOfActions()); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - try { - for (DocWriteRequest req : request.requests()) { - handleFailedRequest(req, failure, ILLEGAL_REST_STATUS_CODE); - } - } catch (Throwable t) { - failureThrowable.compareAndSet(null, t); - } - pendingActions.getAndAdd(-request.numberOfActions()); - } - - /** - * Handle each response. - * @param response A response of a bulk requests. - * @param actionRequest The corresponding request. - * @return Returns 0 if it is a failure response. Otherwise 1. - * @throws Throwable Exceptions thrown when handling failure response. - */ - private int handleBulkItemResponse(BulkItemResponse response, DocWriteRequest actionRequest) throws Throwable { - if (response.isFailed() && Objects.nonNull(response.getFailure().getCause())) { - Throwable failure = response.getFailure().getCause(); - RestStatus restStatus = response.getFailure().getStatus(); - int restStatusCode = Objects.isNull(restStatus) ? ILLEGAL_REST_STATUS_CODE : restStatus.getStatus(); - handleFailedRequest(actionRequest, failure, restStatusCode); - return FAILURE_RESPONSE_IDENTIFIER; - } - return SUCCESS_RESPONSE_IDENTIFIER; - } - - private void handleFailedRequest(DocWriteRequest actionRequest, Throwable failure, int restStatusCode) throws Throwable { - if (actionRequest instanceof ActionRequest) { - failureHandler.onFailure((ActionRequest) actionRequest, failure, restStatusCode); - } else { - throw new IOException("Found request which is not ActionRequest: " + actionRequest.getClass()); - } - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkProcessorBuilder.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkProcessorBuilder.java deleted file mode 100644 index 4a663ac8d..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkProcessorBuilder.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.rest.bulk; - -import com.bytedance.bitsail.common.BitSailException; -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.exception.CommonErrorCode; -import com.bytedance.bitsail.common.util.Preconditions; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; - -import lombok.Setter; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; - -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; - -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.BACKOFF_POLICY_CONSTANT; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.BACKOFF_POLICY_EXPONENTIAL; -import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.BACKOFF_POLICY_NONE; - -@Setter -public class EsBulkProcessorBuilder { - - private final int bulkFlushMaxActions; - private final int bulkFlushMaxMb; - private final long bulkFlushInterval; - private final BackoffPolicy backoffPolicy; - - private RestHighLevelClient restClient; - private BulkProcessor.Listener listener; - - public EsBulkProcessorBuilder(BitSailConfiguration jobConf) { - this.bulkFlushMaxActions = jobConf.get(ElasticsearchWriterOptions.BULK_FLUSH_MAX_ACTIONS); - this.bulkFlushMaxMb = jobConf.get(ElasticsearchWriterOptions.BULK_FLUSH_MAX_SIZE_MB); - this.bulkFlushInterval = jobConf.get(ElasticsearchWriterOptions.BULK_FLUSH_INTERVAL_MS); - this.backoffPolicy = initBackoffPolicy(jobConf); - } - - public BulkProcessor build() { - Preconditions.checkNotNull(restClient, "RestHighLevelClient is not initialized!"); - Preconditions.checkNotNull(listener, "BulkProcessor listener is not initialized!"); - - BiConsumer> bulkConsumer = - ((bulkRequest, bulkResponseActionListener) -> restClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener)); - - BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); - builder.setBulkActions(bulkFlushMaxActions); - builder.setBulkSize(new ByteSizeValue(bulkFlushMaxMb, ByteSizeUnit.MB)); - builder.setFlushInterval(new TimeValue(bulkFlushInterval, TimeUnit.MILLISECONDS)); - builder.setBackoffPolicy(backoffPolicy); - - return builder.build(); - } - - private BackoffPolicy initBackoffPolicy(BitSailConfiguration jobConf) { - String backoffPolicyType = jobConf.get(ElasticsearchWriterOptions.BULK_BACKOFF_POLICY); - Integer backoffDelay = jobConf.get(ElasticsearchWriterOptions.BULK_BACKOFF_DELAY_MS); - Integer backoffRetryNum = jobConf.get(ElasticsearchWriterOptions.BULK_BACKOFF_MAX_RETRY_COUNT); - - switch (backoffPolicyType) { - case BACKOFF_POLICY_CONSTANT: - return BackoffPolicy.constantBackoff(new TimeValue(backoffDelay, TimeUnit.MILLISECONDS), backoffRetryNum); - case BACKOFF_POLICY_EXPONENTIAL: - return BackoffPolicy.exponentialBackoff(new TimeValue(backoffDelay, TimeUnit.MILLISECONDS), backoffRetryNum); - case BACKOFF_POLICY_NONE: - return BackoffPolicy.noBackoff(); - default: - throw new BitSailException(CommonErrorCode.CONFIG_ERROR, "Found un-recognized backoff policy type: " + backoffPolicyType); - } - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkRequestFailureHandler.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkRequestFailureHandler.java deleted file mode 100644 index 0281a0efc..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/bulk/EsBulkRequestFailureHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.rest.bulk; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; - -import org.elasticsearch.action.ActionRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class EsBulkRequestFailureHandler implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(EsBulkRequestFailureHandler.class); - - private final int ignoreFailureThreshold; - private final ConcurrentLinkedQueue bufferedRequests; - private int ignoreFailureCount; - - public EsBulkRequestFailureHandler(BitSailConfiguration jobConf) { - this.ignoreFailureThreshold = jobConf.get(ElasticsearchWriterOptions.MAX_IGNORE_FAILED_REQUEST_THRESHOLD); - this.ignoreFailureCount = 0; - this.bufferedRequests = new ConcurrentLinkedQueue<>(); - } - - public void onFailure(ActionRequest actionRequest, Throwable failure, int restStatusCode) throws Throwable { - LOG.error("Found failed action request. RestStatusCode:[{}], ActionRequest:[{}], Cause:[{}]", - restStatusCode, actionRequest, failure.getMessage(), failure); - bufferedRequests.add(actionRequest); - if (++ignoreFailureCount > ignoreFailureThreshold) { - throw new IOException("failed to handle request: " + actionRequest, failure); - } - } - - public List getBufferedFailedRequest() { - List requests = new ArrayList<>(bufferedRequests); - bufferedRequests.clear(); - return requests; - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchSink.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchSink.java index 63f177cea..67fb44556 100644 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchSink.java +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchSink.java @@ -21,10 +21,10 @@ import com.bytedance.bitsail.base.connector.writer.v1.WriterCommitter; import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.option.WriterOptions; import com.bytedance.bitsail.common.row.Row; import com.bytedance.bitsail.common.type.TypeInfoConverter; import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; -import com.bytedance.bitsail.connector.elasticsearch.base.EsConstants; import java.io.Serializable; import java.util.Optional; @@ -32,20 +32,24 @@ public class ElasticsearchSink implements Sink { private BitSailConfiguration writerConf; + private BitSailConfiguration commonConfiguration; @Override public String getWriterName() { - return EsConstants.ES_CONNECTOR_NAME; + return "elasticsearch"; } @Override public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) { - writerConf = writerConfiguration; + this.commonConfiguration = commonConfiguration; + this.writerConf = writerConfiguration; } @Override public Writer createWriter(Writer.Context context) { - return new ElasticsearchWriter<>(writerConf); + return new ElasticsearchWriter<>(context, + commonConfiguration, + writerConf.getSubConfiguration(WriterOptions.JOB_WRITER)); } @Override diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchWriter.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchWriter.java index 9db3e1a04..88d35224c 100644 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchWriter.java +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/ElasticsearchWriter.java @@ -20,14 +20,12 @@ import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRequestEmitter; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkListener; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkProcessorBuilder; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkRequestFailureHandler; +import com.bytedance.bitsail.connector.elasticsearch.format.DefaultRowSerializationSchema; +import com.bytedance.bitsail.connector.elasticsearch.sink.listener.DefaultBulkListener; +import com.bytedance.bitsail.connector.elasticsearch.sink.sender.ElasticsearchSender; +import com.bytedance.bitsail.connector.elasticsearch.utils.ElasticsearchUtils; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,89 +33,55 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class ElasticsearchWriter implements Writer { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class); - private final RestHighLevelClient restClient; - private final AtomicReference failureThrowable; - private final EsBulkRequestFailureHandler failureHandler; - private final BulkProcessor bulkProcessor; - private final EsRequestEmitter emitter; - private final AtomicInteger pendingActions; - - public ElasticsearchWriter(BitSailConfiguration jobConf) { - this.restClient = new EsRestClientBuilder(jobConf).build(); - this.failureThrowable = new AtomicReference<>(); - this.failureHandler = new EsBulkRequestFailureHandler(jobConf); - this.pendingActions = new AtomicInteger(0); - - EsBulkProcessorBuilder builder = new EsBulkProcessorBuilder(jobConf); - builder.setRestClient(restClient); - builder.setListener(new EsBulkListener(failureHandler, failureThrowable, pendingActions)); - this.bulkProcessor = builder.build(); - - this.emitter = new EsRequestEmitter(jobConf); + private final DefaultRowSerializationSchema serializationSchema; + private final ElasticsearchSender elasticsearchSender; + private final DefaultBulkListener bulkListener; + private final RestHighLevelClient client; + + public ElasticsearchWriter(Context context, + BitSailConfiguration commonConfiguration, + BitSailConfiguration writerConfiguration) { + this.bulkListener = new DefaultBulkListener(context.getIndexOfSubTaskId()); + this.client = new RestHighLevelClient(ElasticsearchUtils.prepareRestClientBuilder(writerConfiguration)); + this.elasticsearchSender = new ElasticsearchSender(writerConfiguration, bulkListener, client); + this.serializationSchema = new DefaultRowSerializationSchema(commonConfiguration, + writerConfiguration, + context.getRowTypeInfo()); } @Override public void write(Row element) { - synchronized (this) { - checkAsyncErrorsAndRequests(); - emitter.emit(element, bulkProcessor); - pendingActions.getAndIncrement(); - } + bulkListener.checkErroneous(); + DocWriteRequest serialize = serializationSchema.serialize(element); + elasticsearchSender.bulkRequest(serialize); } - @SuppressWarnings("checkstyle:MagicNumber") @Override - public void flush(boolean endOfInput) { - synchronized (this) { - checkAsyncErrorsAndRequests(); - while (pendingActions.get() != 0) { - bulkProcessor.flush(); - checkAsyncErrorsAndRequests(); - try { - TimeUnit.MILLISECONDS.sleep(10); - } catch (Exception ignored) { - //ignore - } - } - } + public void flush(boolean endOfInput) throws IOException { + bulkListener.checkErroneous(); + elasticsearchSender.flush(); } @Override - public List prepareCommit() { - return Collections.emptyList(); + public void close() throws IOException { + elasticsearchSender.close(); + client.close(); + bulkListener.checkErroneous(); } @Override - public List snapshotState(long checkpointId) { - this.flush(false); + public List prepareCommit() { + bulkListener.checkErroneous(); return Collections.emptyList(); } @Override - public void close() throws IOException { - bulkProcessor.close(); - restClient.close(); - checkErrorAndRethrow(); - } - - private void checkErrorAndRethrow() { - Throwable cause = failureThrowable.get(); - if (Objects.nonNull(cause)) { - throw new RuntimeException("An error occurred in ElasticsearchWriter.", cause); - } - } - - private void checkAsyncErrorsAndRequests() { - checkErrorAndRethrow(); - List failedRequest = failureHandler.getBufferedFailedRequest(); - failedRequest.forEach(actionRequest -> emitter.emit(actionRequest, bulkProcessor)); + public List snapshotState(long checkpointId) { + bulkListener.checkErroneous(); + return Collections.emptyList(); } } diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/listener/DefaultBulkListener.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/listener/DefaultBulkListener.java new file mode 100644 index 000000000..8c9aec9bc --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/listener/DefaultBulkListener.java @@ -0,0 +1,70 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.sink.listener; + +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +public class DefaultBulkListener implements BulkProcessor.Listener { + private static final Logger LOG = LoggerFactory.getLogger(DefaultBulkListener.class); + + private final int subTaskId; + + private final AtomicReference reference; + + public DefaultBulkListener(int subTaskId) { + this.subTaskId = subTaskId; + this.reference = new AtomicReference<>(); + } + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOG.debug("Subtask {} prepare bulk requests with execution id {}.", subTaskId, executionId); + } + + //Invoke after succeed. + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + LOG.info("Subtask {} finished bulk request with execution id {}, bulk request actions size {}(s).", subTaskId, executionId, request.requests().size()); + if (response.hasFailures()) { + String message = response.buildFailureMessage(); + LOG.error("Subtask {} bulk request actions has some failures, failure message {}.", subTaskId, message); + reference.set(new RuntimeException(message)); + } + } + + //Invoke after failed + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error("Subtask {} failed bulk with execution id {}.", subTaskId, executionId, failure); + reference.set(failure); + } + + public void checkErroneous() { + Throwable throwable = reference.get(); + if (Objects.isNull(throwable)) { + return; + } + throw new RuntimeException(throwable); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/sender/ElasticsearchSender.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/sender/ElasticsearchSender.java new file mode 100644 index 000000000..7ecab66d7 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/sink/sender/ElasticsearchSender.java @@ -0,0 +1,100 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.sink.sender; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +public class ElasticsearchSender implements Serializable, Closeable { + + private final BitSailConfiguration configuration; + + private final BulkProcessor bulkProcessor; + private final BulkProcessor.Listener listener; + + public ElasticsearchSender(BitSailConfiguration configuration, + BulkProcessor.Listener listener, + RestHighLevelClient client) { + this.configuration = configuration; + this.listener = listener; + this.bulkProcessor = prepareBulkProcessor(configuration, client, listener); + } + + private static BulkProcessor prepareBulkProcessor(BitSailConfiguration configuration, + RestHighLevelClient client, + BulkProcessor.Listener listener) { + + BiConsumer> bulkConsumer = + ((bulkRequest, bulkResponseActionListener) -> client + .bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener)); + + BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); + //make flush blocking + builder.setConcurrentRequests(0); + + Integer bulkMaxBatchCount = configuration.get(ElasticsearchOptions.BULK_MAX_BATCH_COUNT); + Integer bulkMaxBatchSize = configuration.get(ElasticsearchOptions.BULK_MAX_BATCH_SIZE); + Long flushInterval = configuration.get(ElasticsearchOptions.BULK_FLUSH_INTERVAL_MS); + + builder.setBulkActions(bulkMaxBatchCount) + .setBulkSize(new ByteSizeValue(bulkMaxBatchSize, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueMillis(flushInterval)) + .setBackoffPolicy( + BackoffPolicy.exponentialBackoff( + new TimeValue(configuration.get(ElasticsearchOptions.BULK_BACKOFF_DELAY_MS), + TimeUnit.MILLISECONDS), + configuration.get(ElasticsearchOptions.BULK_MAX_RETRY_COUNT)) + ); + + return builder.build(); + } + + public void bulkRequest(DocWriteRequest docWriteRequest) { + bulkProcessor.add(docWriteRequest); + } + + public void flush() { + bulkProcessor.flush(); + } + + @Override + public void close() throws IOException { + if (Objects.nonNull(bulkProcessor)) { + bulkProcessor.flush(); + bulkProcessor.close(); + } + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/utils/ElasticsearchUtils.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/utils/ElasticsearchUtils.java new file mode 100644 index 000000000..05090941b --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/utils/ElasticsearchUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.utils; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.constants.Elasticsearchs; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ElasticsearchUtils { + + public static RestClientBuilder prepareRestClientBuilder(BitSailConfiguration configuration) { + String hosts = configuration.get(ElasticsearchOptions.HOSTS); + + List httpHosts = Arrays.stream(StringUtils.split(hosts, Elasticsearchs.COMMA)) + .map(HttpHost::create) + .collect(Collectors.toList()); + RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[] {})); + + String username = configuration.get(ElasticsearchOptions.USERNAME); + String password = configuration.get(ElasticsearchOptions.PASSWORD); + if (StringUtils.isNotEmpty(username) + || StringUtils.isNotEmpty(password)) { + + CredentialsProvider provider = new BasicCredentialsProvider(); + provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(provider)); + } + + String pathPrefix = configuration.get(ElasticsearchOptions.PATH_PREFIX); + if (StringUtils.isNotEmpty(pathPrefix)) { + builder.setPathPrefix(pathPrefix); + } + + int connectionRequestTimeout = configuration.get(ElasticsearchOptions.CONNECTION_REQUEST_TIMEOUT_MS); + int connectionTimeout = configuration.get(ElasticsearchOptions.CONNECTION_TIMEOUT_MS); + int socketTimeout = configuration.get(ElasticsearchOptions.SOCKET_TIMEOUT_MS); + + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder + .setConnectionRequestTimeout(connectionRequestTimeout) + .setConnectTimeout(connectionTimeout) + .setSocketTimeout(socketTimeout)); + + return builder; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtilTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtilTest.java deleted file mode 100644 index a451f6c9d..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/base/NetUtilTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.base; - -import org.junit.Assert; -import org.junit.Test; - -public class NetUtilTest { - - @Test - public void testIpv4() { - String ipv4Address = "127.0.0.1:1234"; - Assert.assertTrue(NetUtil.isIpv4Address(ipv4Address)); - Assert.assertFalse(NetUtil.isIpv6Address(ipv4Address)); - - Assert.assertEquals("127.0.0.1", NetUtil.getIpv4Ip(ipv4Address)); - Assert.assertEquals(1234, NetUtil.getIpv4Port(ipv4Address)); - } - - @Test - public void testIpv6() { - String ipv6Address = "[2001:db8:3333:4444:5555:6666:7777:8888]:1234"; - Assert.assertTrue(NetUtil.isIpv6Address(ipv6Address)); - Assert.assertFalse(NetUtil.isIpv4Address(ipv6Address)); - - Assert.assertEquals("[2001:db8:3333:4444:5555:6666:7777:8888]", NetUtil.getIpv6Ip(ipv6Address)); - Assert.assertEquals(1234, NetUtil.getIpv6Port(ipv6Address)); - } - - @Test - public void testAlias() { - String ipv4Address = "localhost:1234"; - Assert.assertTrue(NetUtil.isIpv4Address(ipv4Address)); - Assert.assertFalse(NetUtil.isIpv6Address(ipv4Address)); - - Assert.assertEquals("localhost", NetUtil.getIpv4Ip(ipv4Address)); - Assert.assertEquals(1234, NetUtil.getIpv4Port(ipv4Address)); - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructorTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructorTest.java deleted file mode 100644 index c4b1b8923..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocConstructorTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRequestEmitter; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.nio.file.Paths; - -public class EsDocConstructorTest { - - private final Object[] fields = new Object[] { - 100, - "varchar", - "text", - "bigint", - "20220810", - "es_index_20220810", - "index", - 10 - }; - private final Row row = new Row(fields); - - @Test - public void testConstructDoc() throws Exception { - BitSailConfiguration jobConf = BitSailConfiguration.from(new File( - Paths.get(getClass().getClassLoader().getResource("es_doc_parameter_test.json").toURI()).toString())); - jobConf.set(ElasticsearchWriterOptions.DOC_EXCLUDE_FIELDS, - "op_type,version"); - - EsDocParameters parameters = EsRequestEmitter.initEsDocParams(jobConf); - EsDocConstructor constructor = new EsDocConstructor(parameters); - String doc = constructor.form(row); - - JSONObject jsonObject = JSON.parseObject(doc); - Assert.assertEquals("20220810", jsonObject.getString("date")); - Assert.assertEquals("text", jsonObject.getString("text_type")); - Assert.assertEquals("varchar", jsonObject.getString("varchar_type")); - Assert.assertEquals("bigint", jsonObject.getString("bigint_type")); - Assert.assertEquals(100, jsonObject.getInteger("id").intValue()); - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocParameterTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocParameterTest.java deleted file mode 100644 index 97d981582..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsDocParameterTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRequestEmitter; - -import com.alibaba.fastjson.serializer.SerializerFeature; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.nio.file.Paths; - -public class EsDocParameterTest { - - @Test - public void testInitializeEsDocParams() throws Exception { - BitSailConfiguration jobConf = BitSailConfiguration.from(new File( - Paths.get(getClass().getClassLoader().getResource("es_doc_parameter_test.json").toURI()).toString())); - EsDocParameters params = EsRequestEmitter.initEsDocParams(jobConf); - - Assert.assertEquals(0, params.getIdFieldsIndices().get(0).intValue()); - Assert.assertEquals(4, params.getIdFieldsIndices().get(1).intValue()); - Assert.assertEquals(1, params.getExcludedFieldsIndices().get(0).intValue()); - Assert.assertEquals(0, params.getRoutingFieldsIndices().get(0).intValue()); - - Assert.assertEquals(",", params.getIdDelimiter()); - Assert.assertEquals(5, params.getDynamicFieldIndex().intValue()); - Assert.assertEquals(6, params.getOpTypeIndex().intValue()); - Assert.assertEquals(7, params.getVersionIndex().intValue()); - - Assert.assertTrue(params.isIgnoreBlankValue()); - Assert.assertFalse(params.isFlattenMap()); - Assert.assertEquals(SerializerFeature.QuoteFieldNames, params.getJsonFeatures().get(0)); - Assert.assertEquals(SerializerFeature.UseSingleQuotes, params.getJsonFeatures().get(1)); - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsKeySelectorTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsKeySelectorTest.java deleted file mode 100644 index 54a4aeb9e..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsKeySelectorTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.doc.tools.EsKeySelector; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; - -public class EsKeySelectorTest { - - @Test - public void testKeySelector() { - List idFieldIndices = Arrays.asList(0, 1); - String delimiter = ","; - EsKeySelector selector = new EsKeySelector(idFieldIndices, delimiter); - - String[] fields = new String[] {"A", "B", "C", "D"}; - Row row = new Row(fields); - - Assert.assertEquals("A,B", selector.getKey(row)); - } -} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructorTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructorTest.java deleted file mode 100644 index b7a36a536..000000000 --- a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/doc/EsRequestConstructorTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.elasticsearch.doc; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.doc.parameter.EsDocParameters; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRequestEmitter; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.nio.file.Paths; - -public class EsRequestConstructorTest { - - private final Object[] fields = new Object[] { - 100, - "varchar", - "text", - "bigint", - "20220810", - "es_index_20220810", - "upsert", - 10 - }; - private final Row row = new Row(fields); - - @Test - public void testCreateRequest() throws Exception { - BitSailConfiguration jobConf = BitSailConfiguration.from(new File( - Paths.get(getClass().getClassLoader().getResource("es_doc_parameter_test.json").toURI()).toString())); - jobConf.set(ElasticsearchWriterOptions.DOC_EXCLUDE_FIELDS, - "op_type,version"); - - EsDocParameters parameters = EsRequestEmitter.initEsDocParams(jobConf); - EsRequestConstructor constructor = new EsRequestConstructor(jobConf, parameters); - ActionRequest actionRequest = constructor.createRequest(row); - Assert.assertTrue(actionRequest instanceof UpdateRequest); - UpdateRequest request = (UpdateRequest) actionRequest; - Assert.assertEquals("es_index_20220810", request.index()); - Assert.assertEquals("_doc", request.type()); - } -} diff --git a/bitsail-test/bitsail-test-end-to-end/bitsail-test-e2e-connector-v1/bitsail-test-e2e-connector-v1-elasticsearch/src/test/java/com/bytedance/bitsail/test/e2e/datasource/ElasticsearchDataSource.java b/bitsail-test/bitsail-test-end-to-end/bitsail-test-e2e-connector-v1/bitsail-test-e2e-connector-v1-elasticsearch/src/test/java/com/bytedance/bitsail/test/e2e/datasource/ElasticsearchDataSource.java index 8a904ae79..b575f847e 100644 --- a/bitsail-test/bitsail-test-end-to-end/bitsail-test-e2e-connector-v1/bitsail-test-e2e-connector-v1-elasticsearch/src/test/java/com/bytedance/bitsail/test/e2e/datasource/ElasticsearchDataSource.java +++ b/bitsail-test/bitsail-test-end-to-end/bitsail-test-e2e-connector-v1/bitsail-test-e2e-connector-v1-elasticsearch/src/test/java/com/bytedance/bitsail/test/e2e/datasource/ElasticsearchDataSource.java @@ -20,7 +20,7 @@ import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.exception.CommonErrorCode; import com.bytedance.bitsail.common.option.WriterOptions; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; import com.bytedance.bitsail.connector.elasticsearch.sink.ElasticsearchSink; import lombok.SneakyThrows; @@ -46,7 +46,6 @@ import java.io.IOException; import java.time.Duration; import java.time.temporal.ChronoUnit; -import java.util.Collections; public class ElasticsearchDataSource extends AbstractDataSource { private static final Logger LOG = LoggerFactory.getLogger(AbstractDataSource.class); @@ -91,9 +90,9 @@ public boolean accept(BitSailConfiguration jobConf, Role role) { @Override public void modifyJobConf(BitSailConfiguration jobConf) { - jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, - Collections.singletonList(getInternalHttpHostAddress())); - jobConf.set(ElasticsearchWriterOptions.ES_INDEX, ES_INDEX); + jobConf.setWriter(ElasticsearchOptions.HOSTS, + getInternalHttpHostAddress()); + jobConf.setWriter(ElasticsearchOptions.INDEX, ES_INDEX); } @SuppressWarnings("checkstyle:MagicNumber") diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/pom.xml b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/pom.xml index e485df88b..b980e8a5a 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/pom.xml +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/pom.xml @@ -63,5 +63,12 @@ ${elasticsearch.version} test + + + org.apache.logging.log4j + log4j-to-slf4j + 2.11.1 + + \ No newline at end of file diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchSinkITCase.java index 9ffc34e6b..3aeeab933 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchSinkITCase.java +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchSinkITCase.java @@ -16,10 +16,12 @@ package com.bytedance.bitsail.test.integration.elasticsearch; +import com.bytedance.bitsail.base.execution.Mode; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.option.CommonOptions; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; +import com.bytedance.bitsail.common.option.WriterOptions; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; +import com.bytedance.bitsail.connector.elasticsearch.utils.ElasticsearchUtils; import com.bytedance.bitsail.connector.fake.option.FakeReaderOptions; import com.bytedance.bitsail.test.integration.AbstractIntegrationTest; import com.bytedance.bitsail.test.integration.elasticsearch.container.ElasticsearchCluster; @@ -36,77 +38,75 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.Collections; - @SuppressWarnings("checkstyle:MagicNumber") public class ElasticsearchSinkITCase extends AbstractIntegrationTest { - private final int totalCount = 300; + private static final int TOTAL_COUNT = 300; private static final String INDEX = "es_index_test"; - private static ElasticsearchCluster esCluster; + private static ElasticsearchCluster cluster; private final CountRequest countRequest = new CountRequest(INDEX); private RestHighLevelClient client; @BeforeClass - public static void prepareEsCluster() throws Exception { - esCluster = new ElasticsearchCluster(); - esCluster.startService(); - esCluster.checkClusterHealth(); + public static void beforeClass() throws Exception { + cluster = new ElasticsearchCluster(); + cluster.startService(); + cluster.checkClusterHealth(); } @Before - public void initIndex() { - esCluster.resetIndex(INDEX); + public void before() { + cluster.resetIndex(INDEX); BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); - jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, - Collections.singletonList(esCluster.getHttpHostAddress())); - client = new EsRestClientBuilder(jobConf).build(); + jobConf.setWriter(ElasticsearchOptions.HOSTS, + cluster.getHttpHostAddress()); + client = new RestHighLevelClient(ElasticsearchUtils + .prepareRestClientBuilder(jobConf.getSubConfiguration(WriterOptions.JOB_WRITER))); } @After - public void closeClient() throws Exception { + public void after() throws Exception { client.close(); } @AfterClass - public static void closeEsCluster() { - esCluster.close(); + public static void afterClass() { + cluster.close(); } @Test public void testBatch() throws Exception { BitSailConfiguration jobConf = JobConfUtils.fromClasspath("es_sink_test.json"); - jobConf.set(FakeReaderOptions.TOTAL_COUNT, totalCount); + jobConf.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_COUNT); jobConf.set(FakeReaderOptions.RATE, 1000); - jobConf.set(ElasticsearchWriterOptions.ES_INDEX, INDEX); - jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, - Collections.singletonList(esCluster.getHttpHostAddress())); + jobConf.setWriter(ElasticsearchOptions.INDEX, INDEX); + jobConf.setWriter(ElasticsearchOptions.HOSTS, cluster.getHttpHostAddress()); submitJob(jobConf); - esCluster.flush(); + cluster.flush(); CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT); - Assert.assertEquals(totalCount, countResponse.getCount()); + Assert.assertEquals(TOTAL_COUNT, countResponse.getCount()); } @Test public void testStreaming() throws Exception { BitSailConfiguration jobConf = JobConfUtils.fromClasspath("es_sink_test.json"); - jobConf.set(CommonOptions.JOB_TYPE, "STREAMING"); + jobConf.set(CommonOptions.JOB_TYPE, Mode.STREAMING.name()); jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_ENABLE, true); - jobConf.set(FakeReaderOptions.TOTAL_COUNT, totalCount); - jobConf.set(FakeReaderOptions.RATE, totalCount / 10); - jobConf.set(ElasticsearchWriterOptions.ES_INDEX, INDEX); - jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, - Collections.singletonList(esCluster.getHttpHostAddress())); + jobConf.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_COUNT); + jobConf.set(FakeReaderOptions.RATE, TOTAL_COUNT / 10); + + jobConf.setWriter(ElasticsearchOptions.INDEX, INDEX); + jobConf.setWriter(ElasticsearchOptions.HOSTS, cluster.getHttpHostAddress()); submitJob(jobConf); - esCluster.flush(); + cluster.flush(); CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT); - Assert.assertEquals(totalCount, countResponse.getCount()); + Assert.assertEquals(TOTAL_COUNT, countResponse.getCount()); } } diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchWriterITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchWriterITCase.java index db63062d1..679bcaf19 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchWriterITCase.java +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/java/com/bytedance/bitsail/test/integration/elasticsearch/ElasticsearchWriterITCase.java @@ -16,24 +16,28 @@ package com.bytedance.bitsail.test.integration.elasticsearch; +import com.bytedance.bitsail.base.connector.writer.v1.Writer; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.option.WriterOptions; import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; -import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkListener; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkProcessorBuilder; -import com.bytedance.bitsail.connector.elasticsearch.rest.bulk.EsBulkRequestFailureHandler; +import com.bytedance.bitsail.common.row.RowKind; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfoUtils; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchOptions; import com.bytedance.bitsail.connector.elasticsearch.sink.ElasticsearchWriter; +import com.bytedance.bitsail.connector.elasticsearch.utils.ElasticsearchUtils; import com.bytedance.bitsail.test.integration.elasticsearch.container.ElasticsearchCluster; import com.bytedance.bitsail.test.integration.utils.JobConfUtils; -import org.elasticsearch.action.bulk.BulkProcessor; +import com.google.common.collect.Maps; +import org.apache.commons.collections.MapUtils; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -41,114 +45,129 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class ElasticsearchWriterITCase { - private final String docString = "{\n" + - " \"date\":\"20220810\",\n" + - " \"text_type\":\"text\",\n" + - " \"varchar_type\":\"varchar\",\n" + - " \"bigint_type\":\"bigint\",\n" + - " \"id\":100\n" + - "}"; - private final Row row = new Row(new Object[] { - "test_id", "varchar", "text", "bigint", "20220810" - }); - private final String id = "test_id"; - private final String index = "es_index_test"; - - private static ElasticsearchCluster esCluster; - private RestHighLevelClient client; + private static ElasticsearchCluster cluster; + private static final String INDEX_NAME = "elasticsearch_index"; + private RestHighLevelClient client; private BitSailConfiguration jobConf; - private IndexRequest indexRequest = new IndexRequest().index(index).id(id); - private GetRequest getRequest = new GetRequest().index(index).id(id); @BeforeClass - public static void prepareEsCluster() throws Exception { - esCluster = new ElasticsearchCluster(); - esCluster.startService(); - esCluster.checkClusterHealth(); + public static void beforeClass() throws Exception { + cluster = new ElasticsearchCluster(); + cluster.startService(); + cluster.checkClusterHealth(); } @Before - public void initIndex() { - esCluster.resetIndex(index); - + public void before() { + cluster.resetIndex(INDEX_NAME); jobConf = BitSailConfiguration.newDefault(); - jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, - Collections.singletonList(esCluster.getHttpHostAddress())); - - client = new EsRestClientBuilder(jobConf).build(); + jobConf.setWriter(ElasticsearchOptions.HOSTS, cluster.getHttpHostAddress()); + client = new RestHighLevelClient(ElasticsearchUtils.prepareRestClientBuilder(jobConf.getSubConfiguration(WriterOptions.JOB_WRITER))); } @After - public void closeClient() throws Exception { + public void after() throws Exception { client.close(); } @AfterClass - public static void closeEsCluster() { - esCluster.close(); + public static void afterClass() { + cluster.close(); } @Test - public void testRestClientBuilder() throws Exception { - indexRequest.source(docString, XContentType.JSON); - client.index(indexRequest, RequestOptions.DEFAULT); - check(client, 100); + public void testWriteWithRowKind() throws Exception { + BitSailConfiguration writerConfiguration = JobConfUtils.fromClasspath("es_writer_parameter_test.json"); + writerConfiguration.merge(jobConf, true); + writerConfiguration.setWriter(ElasticsearchOptions.INDEX, INDEX_NAME); + TypeInfoConverter fileMappingTypeInfoConverter = new BitSailTypeInfoConverter(); + RowTypeInfo rowTypeInfo = TypeInfoUtils.getRowTypeInfo(fileMappingTypeInfoConverter, writerConfiguration.get(WriterOptions.BaseWriterOptions.COLUMNS)); + ElasticsearchWriter writer = new ElasticsearchWriter<>( + new WriterMockContext(rowTypeInfo), + BitSailConfiguration.newDefault(), + writerConfiguration.getSubConfiguration(WriterOptions.JOB_WRITER)); + + long id = 1; + writer.write(mockRow(RowKind.INSERT, id)); + writer.flush(false); + checkDocumentExistsOrNot(client, INDEX_NAME, id, true); + writer.write(mockRow(RowKind.DELETE, id)); + writer.flush(false); + checkDocumentExistsOrNot(client, INDEX_NAME, id, false); + id = 2; + writer.write(mockRow(RowKind.UPDATE_AFTER, id)); + writer.flush(false); + checkDocumentExistsOrNot(client, INDEX_NAME, id, true); + writer.write(mockRow(RowKind.UPDATE_BEFORE, id)); + writer.flush(false); + checkDocumentExistsOrNot(client, INDEX_NAME, id, false); + + //Null value could write in INSERT MODE + Object nullId = null; + writer.write(mockRow(RowKind.INSERT, nullId)); + writer.flush(false); } - @Test - public void testBulkProcessor() throws Exception { - AtomicReference failureThrowable = new AtomicReference<>(); - EsBulkRequestFailureHandler failureHandler = new EsBulkRequestFailureHandler(jobConf); - AtomicInteger pendingActions = new AtomicInteger(0); - - EsBulkProcessorBuilder builder = new EsBulkProcessorBuilder(jobConf); - builder.setRestClient(client); - builder.setListener(new EsBulkListener(failureHandler, failureThrowable, pendingActions)); - BulkProcessor bulkProcessor = builder.build(); - - indexRequest.source(docString, XContentType.JSON); - pendingActions.incrementAndGet(); - bulkProcessor.add(indexRequest); - bulkProcessor.flush(); - while (pendingActions.get() > 0) { - bulkProcessor.awaitClose(10, TimeUnit.SECONDS); + private static void checkDocumentExistsOrNot(RestHighLevelClient client, + String index, + Object id, + boolean exists) throws Exception { + GetRequest getRequest = new GetRequest() + .index(index) + .id(id.toString()); + GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + + Map source = response.getSourceAsMap(); + if (exists) { + Assert.assertTrue(MapUtils.isNotEmpty(source)); + } else { + Assert.assertTrue(MapUtils.isEmpty(source)); } - Assert.assertEquals(0, pendingActions.get()); - check(client, 100); } - @Test - public void testEsWriter() throws Exception { - BitSailConfiguration bitSailConf = JobConfUtils.fromClasspath("es_writer_parameter_test.json"); - bitSailConf.merge(jobConf, true); - bitSailConf.set(ElasticsearchWriterOptions.ES_INDEX, index); + private static Row mockRow(RowKind rowKind, Object id) { + Map values = Maps.newHashMap(); + values.put("k", "v"); + Row row = new Row(new Object[] { + id, "varchar", "text", "bigint", "20220810" + }); + row.setKind(rowKind); + return row; + } - ElasticsearchWriter writer = new ElasticsearchWriter<>(bitSailConf); - writer.write(row); - writer.flush(false); - writer.close(); + public static class WriterMockContext implements Writer.Context { - check(client, id); - } + private RowTypeInfo rowTypeInfo; - private void check(RestHighLevelClient client, Object id) throws Exception { - GetResponse response = client.get(getRequest, RequestOptions.DEFAULT); + public WriterMockContext(RowTypeInfo rowTypeInfo) { + this.rowTypeInfo = rowTypeInfo; + } - Map source = response.getSource(); - Assert.assertEquals("20220810", source.get("date")); - Assert.assertEquals("text", source.get("text_type")); - Assert.assertEquals("varchar", source.get("varchar_type")); - Assert.assertEquals("bigint", source.get("bigint_type")); - Assert.assertEquals(id, source.get("id")); + @Override + public RowTypeInfo getRowTypeInfo() { + return rowTypeInfo; + } + + @Override + public int getIndexOfSubTaskId() { + return 0; + } + + @Override + public boolean isRestored() { + return false; + } + + @Override + public List getRestoreStates() { + return null; + } } } diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/resources/logback.xml b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/resources/logback.xml new file mode 100644 index 000000000..f7f7f119a --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-elasticsearch/src/test/resources/logback.xml @@ -0,0 +1,28 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file