Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatible with versions 5, 6 and 7 with es restclient #66

Open
wants to merge 2 commits into
base: 0.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions modules/executor/engine/datax/datax-elasticsearchwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public void init() {
}
@Override
public void prepare() {
ElasticRestClient restClient;
ElasticRestHighClient restClient;
Map<String, Object> clientConfig = jobConf.getMap(ElasticKey.CLIENT_CONFIG);
if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){
restClient = ElasticRestClient.custom(endPoints, userName,
restClient = ElasticRestHighClient.custom(endPoints, userName,
password, clientConfig);
}else{
restClient = ElasticRestClient.custom(endPoints, clientConfig);
restClient = ElasticRestHighClient.custom(endPoints, clientConfig);
}
String indexName = this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE);
String indexType = this.jobConf.getString(ElasticKey.INDEX_TYPE, "");
Expand Down Expand Up @@ -136,7 +136,7 @@ private void validateParams(){
this.jobConf.getNecessaryValue(ElasticKey.INDEX_NAME, ElasticWriterErrorCode.REQUIRE_VALUE);
}

private Map<Object, Object> resolveColumn(ElasticRestClient client,
private Map<Object, Object> resolveColumn(ElasticRestHighClient client,
String index, String type ,
List<Object> rawColumnList, List<ElasticColumn> outputColumn,
String columnNameSeparator){
Expand Down Expand Up @@ -192,16 +192,16 @@ private void resolveColumn(List<ElasticColumn> outputColumn, ElasticColumn colum
levelColumn.setFormat(String.valueOf(metaMap.get(ElasticKey.PROPS_COLUMN_FORMAT)));
}
outputColumn.add(levelColumn);
}else if(null != metaMap.get(ElasticRestClient.FIELD_PROPS)
&& metaMap.get(ElasticRestClient.FIELD_PROPS) instanceof Map){
}else if(null != metaMap.get(ElasticRestHighClient.FIELD_PROPS)
&& metaMap.get(ElasticRestHighClient.FIELD_PROPS) instanceof Map){
ElasticColumn levelColumn = column;
if(null == levelColumn){
levelColumn = new ElasticColumn();
levelColumn.setName(String.valueOf(key));
}else{
levelColumn.setName(levelColumn.getName() + columnNameSeparator + key);
}
resolveColumn(outputColumn, levelColumn, (Map)metaMap.get(ElasticRestClient.FIELD_PROPS),
resolveColumn(outputColumn, levelColumn, (Map)metaMap.get(ElasticRestHighClient.FIELD_PROPS),
columnNameSeparator);
}
}
Expand All @@ -218,14 +218,14 @@ public static class Task extends Writer.Task{
private String typeName;
private String columnNameSeparator = ElasticColumn.DEFAULT_NAME_SPLIT;
private List<ElasticColumn> columns;
private ElasticRestClient restClient;
private ElasticRestHighClient restClient;
private BulkProcessor bulkProcessor;

@Override
public void init() {
this.taskConf = super.getPluginJobConf();
indexName = this.taskConf.getString(ElasticKey.INDEX_NAME);
typeName = this.taskConf.getString(ElasticKey.INDEX_TYPE, ElasticRestClient.MAPPING_TYPE_DEFAULT);
typeName = this.taskConf.getString(ElasticKey.INDEX_TYPE, ElasticRestHighClient.MAPPING_TYPE_DEFAULT);
columnNameSeparator = this.taskConf.getString(ElasticKey.COLUMN_NAME_SEPARATOR, ElasticColumn.DEFAULT_NAME_SPLIT);
int batchSize = this.taskConf.getInt(ElasticKey.BULK_ACTIONS, 1000);
int bulkPerTask = this.taskConf.getInt(ElasticKey.BULK_PER_TASK, 1);
Expand All @@ -242,10 +242,10 @@ public void init() {
}
String[] endPoints = this.taskConf.getString(ElasticKey.ENDPOINTS).split(DEFAULT_ENDPOINT_SPLIT);
if(StringUtils.isNotBlank(userName) && StringUtils.isNotBlank(password)){
restClient = ElasticRestClient.custom(endPoints, userName,
restClient = ElasticRestHighClient.custom(endPoints, userName,
password, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG));
}else{
restClient = ElasticRestClient.custom(endPoints, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG));
restClient = ElasticRestHighClient.custom(endPoints, this.taskConf.getMap(ElasticKey.CLIENT_CONFIG));
}
this.bulkProcessor = restClient.createBulk(buildListener(getTaskPluginCollector()), batchSize, bulkPerTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public enum ElasticWriterErrorCode implements ErrorCode {
MAPPING_TYPE_UNSUPPORTED("ESWriter-08", "Unsupported mapping type"),
BULK_REQ_ERROR("ESWriter-09", "Bulk request error"),
INDEX_NOT_EXIST("ESWriter-10", "Index not exist"),
CONFIG_ERROR("ESWriter-11", "Config error");
CONFIG_ERROR("ESWriter-11", "Config error"),
ES_VERSION("ESWriter-12", "Cannot get Elasticsearch version"),
FEATURES_ERROR("ESWriter-13", "Function not supported");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,75 @@ public void setFormat(String format) {
this.format = format;
}

public static String recordToString(Record record, List<ElasticColumn> colConfs, String columnNameSeparator){
String sql1="{%s}";
boolean first=false;

StringBuffer sb=new StringBuffer();
for(int i = 0; i < record.getColumnNumber(); i++){
Column column = record.getColumn(i);
ElasticColumn config = colConfs.get(i);
String columnName = config.getName();
if(first){
sb.append(",");
}
first = true;
sb.append("\"").append(columnName).append("\":");
ElasticFieldDataType type = ElasticFieldDataType.valueOf(config.getType().toUpperCase());
switch(type){
case IP:
case IP_RANGE:
case KEYWORD:
case TEXT:
sb.append("\"").append(column.asString()).append("\"");;
break;
case GEO_POINT:
case GEO_SHAPE:
case NESTED:
case OBJECT:
sb.append("\"").append(parseObject(column.asString())).append("\"");
break;
case LONG_RANGE:
case LONG:
sb.append(column.asLong());
break;
case INTEGER:
case INTEGER_RANGE:
case SHORT:
sb.append(column.asBigInteger());
break;
case FLOAT:
case FLOAT_RANGE:
case HALF_FLOAT:
case SCALED_FLOAT:
case DOUBLE_RANGE:
case DOUBLE:
sb.append(column.asDouble());
break;
case BINARY:
case BYTE:
sb.append("\"").append(column.asString()).append("\"");
break;
case BOOLEAN:
sb.append(column.asBoolean());
break;
case DATE_RANGE:
case DATE:
sb.append("\"").append(parseDate(config, column)).append("\"");
break;
default:
throw DataXException.asDataXException(ElasticWriterErrorCode.MAPPING_TYPE_UNSUPPORTED,
"unsupported type:[" +config.getType() + "]");
}
}
String sql2=sb.toString();
if(StringUtils.isNoneBlank(sql2)){
return String.format(sql1, sql2);
}else{
return null;
}
}

public static Map<String, Object> toData(Record record, List<ElasticColumn> colConfs, String columnNameSeparator){
Map<String, Object> outputData = new HashMap<>(record.getColumnNumber());
for(int i = 0; i < record.getColumnNumber(); i++){
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
Expand Down