diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml index 01217063a7..96efa91750 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml +++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml @@ -68,5 +68,16 @@ guava ${guava.version} + + software.amazon.awssdk + auth + 2.29.47 + compile + + + io.github.acm19 + aws-request-signing-apache-interceptor + 3.0.0 + diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index 2207d416a2..f2a0811e87 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -17,11 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client; +import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor; import org.apache.http.Header; import org.apache.http.HttpRequestInterceptor; import org.apache.http.message.BasicHeader; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.KNN_VECTOR; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.VECTOR; +import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode; @@ -80,6 +82,11 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DATE_NANOS; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.DENSE_VECTOR; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner; +import software.amazon.awssdk.regions.Region; @Slf4j public class EsRestClient implements Closeable { @@ -89,12 +96,14 @@ public class EsRestClient implements Closeable { private static final int SOCKET_TIMEOUT = 5 * 60 * 1000; private final RestClient restClient; + private static ReadonlyConfig connConfig; private EsRestClient(RestClient restClient) { this.restClient = restClient; } public static EsRestClient createInstance(ReadonlyConfig config) { + connConfig = config; List hosts = config.get(EsClusterConnectionConfig.HOSTS); Optional cloudId = config.getOptional(EsClusterConnectionConfig.CLOUD_ID); Optional username = config.getOptional(EsClusterConnectionConfig.USERNAME); @@ -200,6 +209,23 @@ private static RestClientBuilder getRestClientBuilder( httpClientBuilder.addInterceptorFirst((HttpRequestInterceptor) (request, context) -> { request.addHeader(apiKeyHeader); }); + }else{ + Map iam = connConfig.get(IAM); + AwsCredentialsProvider awsCredentials = StaticCredentialsProvider.create( + AwsBasicCredentials.create(iam.get("access_key"), iam.get("secret_key")) + ); + // AWS Region & Service + String region = iam.get("region"); // Update to match your OpenSearch region + String serviceName = iam.get("service_name"); // Use "es" for Managed OpenSearch, "aoss" for Serverless + + // OpenSearch Host (WITHOUT "https://") + HttpRequestInterceptor awsSigV4Interceptor = new AwsRequestSigningApacheInterceptor( + serviceName, + AwsV4HttpSigner.create(), + awsCredentials, + region + ); + httpClientBuilder.addInterceptorLast(awsSigV4Interceptor); } try { @@ -328,6 +354,28 @@ public ScrollResult searchByScroll( String endpoint = "/" + index + "/_search?scroll=" + scrollTime; return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param)); } + public ScrollResult search(String index, List source, Map query, List searchAfter, int pageSize) { + Map param = new HashMap<>(); + param.put("query", query); + param.put("_source", source); + param.put("size", pageSize); // ✅ Use pageSize dynamically + param.put("sort", List.of( + Map.of("_id", "asc") + )); // ✅ Stable sorting + + if (searchAfter != null && !searchAfter.isEmpty()) { + param.put("search_after", searchAfter); // ✅ Add `search_after` only if available + } + + String endpoint = "/" + index + "/_search"; + + try { + return getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param)); + } catch (Exception e) { + System.err.println("Error during search_after request: " + e.getMessage()); + return null; + } + } /** * scroll to get result call _search/scroll @@ -383,7 +431,7 @@ private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBod private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) { ScrollResult scrollResult = new ScrollResult(); - String scrollId = responseJson.get("_scroll_id").asText(); + String scrollId = responseJson.get("_scroll_id") == null ? null : responseJson.get("_scroll_id").asText(); scrollResult.setScrollId(scrollId); JsonNode hitsNode = responseJson.get("hits").get("hits"); @@ -407,6 +455,23 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) { } docs.add(doc); } + if (hitsNode.isArray() && !hitsNode.isEmpty()) { + JsonNode lastSortNode = hitsNode.get(hitsNode.size() - 1).get("sort"); // Get last "sort" + // Convert to List + List lastSortList = new ArrayList<>(); + if (lastSortNode != null && lastSortNode.isArray()) { + for (JsonNode valueNode : lastSortNode) { + if (valueNode.isNumber()) { + lastSortList.add(valueNode.numberValue()); // Add as Integer/Long/Double + } else if (valueNode.isTextual()) { + lastSortList.add(valueNode.textValue()); // Add as String + } else { + lastSortList.add(valueNode.toString()); // Fallback + } + } + } + scrollResult.setLastSort(lastSortList); + } return scrollResult; } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java index c59fd09448..8cba54ae72 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Options; import java.util.List; +import java.util.Map; public class EsClusterConnectionConfig { @@ -35,6 +36,11 @@ public class EsClusterConnectionConfig { .stringType() .noDefaultValue() .withDescription("Elasticsearch cloud id"); + public static final Option> IAM = + Options.key("iam") + .mapType() + .noDefaultValue() + .withDescription("OpenSearch service name"); public static final Option USERNAME = Options.key("username") diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java index 1368775e52..e81ffef359 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/ScrollResult.java @@ -26,5 +26,6 @@ public class ScrollResult { private String scrollId; + private List lastSort; private List> docs; } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java index 404699c7ad..7184762b3b 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java @@ -98,6 +98,8 @@ public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer put( "yyyy-MM-dd HH:mm:ss.SSSSSSSSS".length(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")); + put("2025-02-07 03:06:16.693985+00:00".length(), + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSXXX")); } }; @@ -250,6 +252,10 @@ private LocalDateTime parseDate(String fieldValue) { || fieldValue.length() == "yyyy-MM-dd".length()) { formatDate = fieldValue + " 00:00:00"; } + if(fieldValue.length() == "2025-02-07 03:06:16.693985+00:00".length()){ + // Remove the offset (+00:00) + formatDate = formatDate.substring(0, formatDate.lastIndexOf("+")); + } DateTimeFormatter dateTimeFormatter = dateTimeFormatterMap.get(formatDate.length()); if (dateTimeFormatter == null) { throw new ElasticsearchConnectorException( diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java index a58c2c622d..ff8203a850 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.IAM; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -77,19 +79,41 @@ public void pollNext(Collector output) throws Exception { SeaTunnelRowDeserializer deserializer = new DefaultSeaTunnelRowDeserializer(seaTunnelRowType); SourceConfig sourceIndexInfo = split.getSourceConfig(); - ScrollResult scrollResult = - esRestClient.searchByScroll( - sourceIndexInfo.getIndex(), - sourceIndexInfo.getSource(), - sourceIndexInfo.getQuery(), - sourceIndexInfo.getScrollTime(), - sourceIndexInfo.getScrollSize()); - outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); - while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) { + ScrollResult scrollResult = null; + if(connConfig.get(IAM).get("service_name").equals("aoss")) { scrollResult = - esRestClient.searchWithScrollId( - scrollResult.getScrollId(), sourceIndexInfo.getScrollTime()); + esRestClient.search( + sourceIndexInfo.getIndex(), + sourceIndexInfo.getSource(), + sourceIndexInfo.getQuery(), + Collections.emptyList(), + sourceIndexInfo.getScrollSize()); outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); + while (scrollResult.getDocs() != null && !scrollResult.getDocs().isEmpty()) { + scrollResult = + esRestClient.search( + sourceIndexInfo.getIndex(), + sourceIndexInfo.getSource(), + sourceIndexInfo.getQuery(), + scrollResult.getLastSort(), + sourceIndexInfo.getScrollSize()); + outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); + } + }else { + scrollResult = + esRestClient.searchByScroll( + sourceIndexInfo.getIndex(), + sourceIndexInfo.getSource(), + sourceIndexInfo.getQuery(), + sourceIndexInfo.getScrollTime(), + sourceIndexInfo.getScrollSize()); + outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); + while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) { + scrollResult = + esRestClient.searchWithScrollId( + scrollResult.getScrollId(), sourceIndexInfo.getScrollTime()); + outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); + } } } else if (noMoreSplit) { // signal to the source that we have reached the end of the data. diff --git a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverter.java b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverter.java index 519a1f92d9..784719afe9 100644 --- a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverter.java +++ b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/utils/MilvusSinkConverter.java @@ -117,7 +117,15 @@ public Object convertBySeaTunnelType( } case ROW: SeaTunnelRow row = (SeaTunnelRow) value; - return JsonUtils.toJsonString(row.getFields()); + SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType; + JsonObject data = new JsonObject(); + for (int i = 0; i < rowType.getFieldNames().length; i++) { + SeaTunnelDataType subFieldType = rowType.getFieldType(i); + Object subValue = row.getField(i); + Object subRow = convertBySeaTunnelType(subFieldType, false, subValue); + data.add(rowType.getFieldNames()[i], gson.toJsonTree(subRow)); + } + return data; case MAP: return JsonUtils.toJsonString(value); default: @@ -245,7 +253,7 @@ public static io.milvus.v2.common.DataType convertSqlTypeToDataType(SqlType sqlT case TIMESTAMP: return io.milvus.v2.common.DataType.VarChar; case ROW: - return io.milvus.v2.common.DataType.VarChar; + return io.milvus.v2.common.DataType.JSON; } throw new CatalogException( String.format("Not support convert to milvus type, sqlType is %s", sqlType));