Skip to content

Commit

Permalink
Merge pull request #52 from zilliztech/opensearch
Browse files Browse the repository at this point in the history
opensearch
nianliuu authored Feb 8, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 677c1f3 + e3021a4 commit 79ac9e3
Showing 7 changed files with 135 additions and 14 deletions.
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -68,5 +68,16 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>2.29.47</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.github.acm19</groupId>
<artifactId>aws-request-signing-apache-interceptor</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -17,11 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor;
import org.apache.http.Header;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.message.BasicHeader;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.KNN_VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -80,6 +82,11 @@
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.regions.Region;

@Slf4j
public class EsRestClient implements Closeable {
@@ -89,12 +96,14 @@ public class EsRestClient implements Closeable {
private static final int SOCKET_TIMEOUT = 5 * 60 * 1000;

private final RestClient restClient;
private static ReadonlyConfig connConfig;

private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}

public static EsRestClient createInstance(ReadonlyConfig config) {
connConfig = config;
List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
Optional<String> cloudId = config.getOptional(EsClusterConnectionConfig.CLOUD_ID);
Optional<String> username = config.getOptional(EsClusterConnectionConfig.USERNAME);
@@ -200,6 +209,23 @@ private static RestClientBuilder getRestClientBuilder(
httpClientBuilder.addInterceptorFirst((HttpRequestInterceptor) (request, context) -> {
request.addHeader(apiKeyHeader);
});
}else{
Map<String, String> iam = connConfig.get(IAM);
AwsCredentialsProvider awsCredentials = StaticCredentialsProvider.create(
AwsBasicCredentials.create(iam.get("access_key"), iam.get("secret_key"))
);
// AWS Region & Service
String region = iam.get("region"); // Update to match your OpenSearch region
String serviceName = iam.get("service_name"); // Use "es" for Managed OpenSearch, "aoss" for Serverless

// OpenSearch Host (WITHOUT "https://")
HttpRequestInterceptor awsSigV4Interceptor = new AwsRequestSigningApacheInterceptor(
serviceName,
AwsV4HttpSigner.create(),
awsCredentials,
region
);
httpClientBuilder.addInterceptorLast(awsSigV4Interceptor);
}

try {
@@ -328,6 +354,28 @@ public ScrollResult searchByScroll(
String endpoint = "/" + index + "/_search?scroll=" + scrollTime;
return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
}
public ScrollResult search(String index, List<String> source, Map<String, Object> query, List<Object> searchAfter, int pageSize) {
Map<String, Object> param = new HashMap<>();
param.put("query", query);
param.put("_source", source);
param.put("size", pageSize); // ✅ Use pageSize dynamically
param.put("sort", List.of(
Map.of("_id", "asc")
)); // ✅ Stable sorting

if (searchAfter != null && !searchAfter.isEmpty()) {
param.put("search_after", searchAfter); // ✅ Add `search_after` only if available
}

String endpoint = "/" + index + "/_search";

try {
return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
} catch (Exception e) {
System.err.println("Error during search_after request: " + e.getMessage());
return null;
}
}

/**
* scroll to get result call _search/scroll
@@ -383,7 +431,7 @@ private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBod

private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
ScrollResult scrollResult = new ScrollResult();
String scrollId = responseJson.get("_scroll_id").asText();
String scrollId = responseJson.get("_scroll_id") == null ? null : responseJson.get("_scroll_id").asText();
scrollResult.setScrollId(scrollId);

JsonNode hitsNode = responseJson.get("hits").get("hits");
@@ -407,6 +455,23 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
}
docs.add(doc);
}
if (hitsNode.isArray() && !hitsNode.isEmpty()) {
JsonNode lastSortNode = hitsNode.get(hitsNode.size() - 1).get("sort"); // Get last "sort"
// Convert to List<Object>
List<Object> lastSortList = new ArrayList<>();
if (lastSortNode != null && lastSortNode.isArray()) {
for (JsonNode valueNode : lastSortNode) {
if (valueNode.isNumber()) {
lastSortList.add(valueNode.numberValue()); // Add as Integer/Long/Double
} else if (valueNode.isTextual()) {
lastSortList.add(valueNode.textValue()); // Add as String
} else {
lastSortList.add(valueNode.toString()); // Fallback
}
}
}
scrollResult.setLastSort(lastSortList);
}
return scrollResult;
}

Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;
import java.util.Map;

public class EsClusterConnectionConfig {

@@ -35,6 +36,11 @@ public class EsClusterConnectionConfig {
.stringType()
.noDefaultValue()
.withDescription("Elasticsearch cloud id");
public static final Option<Map<String, String>> IAM =
Options.key("iam")
.mapType()
.noDefaultValue()
.withDescription("OpenSearch service name");

public static final Option<String> USERNAME =
Options.key("username")
Original file line number Diff line number Diff line change
@@ -26,5 +26,6 @@
public class ScrollResult {

private String scrollId;
private List<Object> lastSort;
private List<Map<String, Object>> docs;
}
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
put(
"yyyy-MM-dd HH:mm:ss.SSSSSSSSS".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"));
put("2025-02-07 03:06:16.693985+00:00".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSXXX"));
}
};

@@ -250,6 +252,10 @@ private LocalDateTime parseDate(String fieldValue) {
|| fieldValue.length() == "yyyy-MM-dd".length()) {
formatDate = fieldValue + " 00:00:00";
}
if(fieldValue.length() == "2025-02-07 03:06:16.693985+00:00".length()){
// Remove the offset (+00:00)
formatDate = formatDate.substring(0, formatDate.lastIndexOf("+"));
}
DateTimeFormatter dateTimeFormatter = dateTimeFormatterMap.get(formatDate.length());
if (dateTimeFormatter == null) {
throw new ElasticsearchConnectorException(
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
@@ -33,6 +34,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
@@ -77,19 +79,41 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
SeaTunnelRowDeserializer deserializer =
new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
SourceConfig sourceIndexInfo = split.getSourceConfig();
ScrollResult scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
ScrollResult scrollResult = null;
if(connConfig.get(IAM).get("service_name").equals("aoss")) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
esRestClient.search(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
Collections.emptyList(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && !scrollResult.getDocs().isEmpty()) {
scrollResult =
esRestClient.search(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
scrollResult.getLastSort(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
}
}else {
scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
}
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
Original file line number Diff line number Diff line change
@@ -117,7 +117,15 @@ public Object convertBySeaTunnelType(
}
case ROW:
SeaTunnelRow row = (SeaTunnelRow) value;
return JsonUtils.toJsonString(row.getFields());
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
JsonObject data = new JsonObject();
for (int i = 0; i < rowType.getFieldNames().length; i++) {
SeaTunnelDataType<?> subFieldType = rowType.getFieldType(i);
Object subValue = row.getField(i);
Object subRow = convertBySeaTunnelType(subFieldType, false, subValue);
data.add(rowType.getFieldNames()[i], gson.toJsonTree(subRow));
}
return data;
case MAP:
return JsonUtils.toJsonString(value);
default:
@@ -245,7 +253,7 @@ public static io.milvus.v2.common.DataType convertSqlTypeToDataType(SqlType sqlT
case TIMESTAMP:
return io.milvus.v2.common.DataType.VarChar;
case ROW:
return io.milvus.v2.common.DataType.VarChar;
return io.milvus.v2.common.DataType.JSON;
}
throw new CatalogException(
String.format("Not support convert to milvus type, sqlType is %s", sqlType));

0 comments on commit 79ac9e3

Please sign in to comment.