Skip to content

Commit

Permalink
support aws opensearch serverless and iam auth
Browse files Browse the repository at this point in the history
  • Loading branch information
nianliuu committed Feb 8, 2025
1 parent 44083a1 commit e3021a4
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 14 deletions.
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,16 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>2.29.47</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.github.acm19</groupId>
<artifactId>aws-request-signing-apache-interceptor</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor;
import org.apache.http.Header;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.message.BasicHeader;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.KNN_VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -80,6 +82,11 @@
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.regions.Region;

@Slf4j
public class EsRestClient implements Closeable {
Expand All @@ -89,12 +96,14 @@ public class EsRestClient implements Closeable {
private static final int SOCKET_TIMEOUT = 5 * 60 * 1000;

private final RestClient restClient;
private static ReadonlyConfig connConfig;

private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}

public static EsRestClient createInstance(ReadonlyConfig config) {
connConfig = config;
List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
Optional<String> cloudId = config.getOptional(EsClusterConnectionConfig.CLOUD_ID);
Optional<String> username = config.getOptional(EsClusterConnectionConfig.USERNAME);
Expand Down Expand Up @@ -200,6 +209,23 @@ private static RestClientBuilder getRestClientBuilder(
httpClientBuilder.addInterceptorFirst((HttpRequestInterceptor) (request, context) -> {
request.addHeader(apiKeyHeader);
});
}else{
Map<String, String> iam = connConfig.get(IAM);
AwsCredentialsProvider awsCredentials = StaticCredentialsProvider.create(
AwsBasicCredentials.create(iam.get("access_key"), iam.get("secret_key"))
);
// AWS Region & Service
String region = iam.get("region"); // Update to match your OpenSearch region
String serviceName = iam.get("service_name"); // Use "es" for Managed OpenSearch, "aoss" for Serverless

// OpenSearch Host (WITHOUT "https://")
HttpRequestInterceptor awsSigV4Interceptor = new AwsRequestSigningApacheInterceptor(
serviceName,
AwsV4HttpSigner.create(),
awsCredentials,
region
);
httpClientBuilder.addInterceptorLast(awsSigV4Interceptor);
}

try {
Expand Down Expand Up @@ -328,6 +354,28 @@ public ScrollResult searchByScroll(
String endpoint = "/" + index + "/_search?scroll=" + scrollTime;
return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
}
public ScrollResult search(String index, List<String> source, Map<String, Object> query, List<Object> searchAfter, int pageSize) {
Map<String, Object> param = new HashMap<>();
param.put("query", query);
param.put("_source", source);
param.put("size", pageSize); // ✅ Use pageSize dynamically
param.put("sort", List.of(
Map.of("_id", "asc")
)); // ✅ Stable sorting

if (searchAfter != null && !searchAfter.isEmpty()) {
param.put("search_after", searchAfter); // ✅ Add `search_after` only if available
}

String endpoint = "/" + index + "/_search";

try {
return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
} catch (Exception e) {
System.err.println("Error during search_after request: " + e.getMessage());
return null;
}
}

/**
* scroll to get result call _search/scroll
Expand Down Expand Up @@ -383,7 +431,7 @@ private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBod

private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
ScrollResult scrollResult = new ScrollResult();
String scrollId = responseJson.get("_scroll_id").asText();
String scrollId = responseJson.get("_scroll_id") == null ? null : responseJson.get("_scroll_id").asText();
scrollResult.setScrollId(scrollId);

JsonNode hitsNode = responseJson.get("hits").get("hits");
Expand All @@ -407,6 +455,23 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
}
docs.add(doc);
}
if (hitsNode.isArray() && !hitsNode.isEmpty()) {
JsonNode lastSortNode = hitsNode.get(hitsNode.size() - 1).get("sort"); // Get last "sort"
// Convert to List<Object>
List<Object> lastSortList = new ArrayList<>();
if (lastSortNode != null && lastSortNode.isArray()) {
for (JsonNode valueNode : lastSortNode) {
if (valueNode.isNumber()) {
lastSortList.add(valueNode.numberValue()); // Add as Integer/Long/Double
} else if (valueNode.isTextual()) {
lastSortList.add(valueNode.textValue()); // Add as String
} else {
lastSortList.add(valueNode.toString()); // Fallback
}
}
}
scrollResult.setLastSort(lastSortList);
}
return scrollResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;
import java.util.Map;

public class EsClusterConnectionConfig {

Expand All @@ -35,6 +36,11 @@ public class EsClusterConnectionConfig {
.stringType()
.noDefaultValue()
.withDescription("Elasticsearch cloud id");
public static final Option<Map<String, String>> IAM =
Options.key("iam")
.mapType()
.noDefaultValue()
.withDescription("OpenSearch service name");

public static final Option<String> USERNAME =
Options.key("username")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
public class ScrollResult {

private String scrollId;
private List<Object> lastSort;
private List<Map<String, Object>> docs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer
put(
"yyyy-MM-dd HH:mm:ss.SSSSSSSSS".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"));
put("2025-02-07 03:06:16.693985+00:00".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSXXX"));
}
};

Expand Down Expand Up @@ -250,6 +252,10 @@ private LocalDateTime parseDate(String fieldValue) {
|| fieldValue.length() == "yyyy-MM-dd".length()) {
formatDate = fieldValue + " 00:00:00";
}
if(fieldValue.length() == "2025-02-07 03:06:16.693985+00:00".length()){
// Remove the offset (+00:00)
formatDate = formatDate.substring(0, formatDate.lastIndexOf("+"));
}
DateTimeFormatter dateTimeFormatter = dateTimeFormatterMap.get(formatDate.length());
if (dateTimeFormatter == null) {
throw new ElasticsearchConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
Expand All @@ -33,6 +34,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -77,19 +79,41 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
SeaTunnelRowDeserializer deserializer =
new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
SourceConfig sourceIndexInfo = split.getSourceConfig();
ScrollResult scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
ScrollResult scrollResult = null;
if(connConfig.get(IAM).get("service_name").equals("aoss")) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
esRestClient.search(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
Collections.emptyList(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && !scrollResult.getDocs().isEmpty()) {
scrollResult =
esRestClient.search(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
scrollResult.getLastSort(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
}
}else {
scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
}
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,15 @@ public Object convertBySeaTunnelType(
}
case ROW:
SeaTunnelRow row = (SeaTunnelRow) value;
return JsonUtils.toJsonString(row.getFields());
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
JsonObject data = new JsonObject();
for (int i = 0; i < rowType.getFieldNames().length; i++) {
SeaTunnelDataType<?> subFieldType = rowType.getFieldType(i);
Object subValue = row.getField(i);
Object subRow = convertBySeaTunnelType(subFieldType, false, subValue);
data.add(rowType.getFieldNames()[i], gson.toJsonTree(subRow));
}
return data;
case MAP:
return JsonUtils.toJsonString(value);
default:
Expand Down Expand Up @@ -245,7 +253,7 @@ public static io.milvus.v2.common.DataType convertSqlTypeToDataType(SqlType sqlT
case TIMESTAMP:
return io.milvus.v2.common.DataType.VarChar;
case ROW:
return io.milvus.v2.common.DataType.VarChar;
return io.milvus.v2.common.DataType.JSON;
}
throw new CatalogException(
String.format("Not support convert to milvus type, sqlType is %s", sqlType));
Expand Down

0 comments on commit e3021a4

Please sign in to comment.