Skip to content

Commit

Permalink
Merge pull request #3737 from atlanhq/task/asyncIndexSearch
Browse files Browse the repository at this point in the history
NOC-87266 | Configure Async behaviour for Indexsearch
  • Loading branch information
aarshi0301 authored Nov 14, 2024
2 parents 4160f80 + 791c348 commit 3293b29
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar
DirectIndexQueryResult result = null;

try {
if(searchParams.isCallAsync()) {
if(searchParams.isCallAsync() || AtlasConfiguration.ENABLE_ASYNC_INDEXSEARCH.getBoolean()) {
return performAsyncDirectIndexQuery(searchParams);
} else{
String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
Expand Down Expand Up @@ -176,6 +176,7 @@ private Map<String, Object> runQueryWithLowLevelClient(String query) throws Atla

private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchParams) throws AtlasBaseException, IOException {
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("performAsyncDirectIndexQuery");
AtlasPerfMetrics.MetricRecorder metricSearchTimeout = RequestContext.get().startMetricRecord("asyncDirectIndexQueryTimeout");
DirectIndexQueryResult result = null;
boolean contextIdExists = StringUtils.isNotEmpty(searchParams.getSearchContextId()) && searchParams.getSearchContextSequenceNo() != null;
try {
Expand All @@ -184,7 +185,12 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
// then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}
AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();

String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}
AsyncQueryResult response = submitAsyncSearch(searchParams, KeepAliveTime, false).get();
if(response.isRunning()) {
/*
* If the response is still running, then we need to wait for the response
Expand All @@ -200,8 +206,11 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response == null) {
// Return null, if the response is null wil help returning @204 HTTP_NO_CONTENT to the user
return null;
// Rather than null (if the response is null wil help returning @204 HTTP_NO_CONTENT to the user)
// return timeout exception to user
LOG.error("timeout exceeded for query {}:", searchParams.getQuery());
RequestContext.get().endMetricRecord(metricSearchTimeout);
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED_DUE_TO_TIMEOUT, KeepAliveTime);
}
result = getResultFromResponse(response.getFullResponse(), true);
} else {
Expand Down Expand Up @@ -349,14 +358,10 @@ public void onFailure(Exception exception) {
lowLevelRestClient.performRequestAsync(request, responseListener);
}

private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, boolean source) {
private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, String KeepAliveTime, boolean source) {
CompletableFuture<AsyncQueryResult> future = new CompletableFuture<>();
HttpEntity entity = new NStringEntity(searchParams.getQuery(), ContentType.APPLICATION_JSON);
String endPoint;
String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}

if (source) {
endPoint = index + "/_async_search";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum AtlasConfiguration {
HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"),

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),
ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
Expand Down
2 changes: 2 additions & 0 deletions intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public enum AtlasErrorCode {
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"),
REPAIR_INDEX_FAILED(500, "ATLAS-500-00-018", "Error occurred while repairing indices: {0}"),
INDEX_SEARCH_FAILED(400, "ATLAS-400-00-102", "Error occurred while running direct index query on ES: {0}"),

INDEX_SEARCH_FAILED_DUE_TO_TIMEOUT(429, "ATLAS-400-00-502", "ES query exceeded timeout: {0}"),
DEPRECATED_API(400, "ATLAS-400-00-103", "Deprecated API. Use {0} instead"),
DISABLED_API(400, "ATLAS-400-00-104", "API temporarily disabled. Reason: {0}"),
HAS_LINEAGE_GET_EDGE_FAILED(500, "ATLAS-500-00-019", "Error occurred while getting edge between vertices for hasLineage migration: {0}"),
Expand Down

0 comments on commit 3293b29

Please sign in to comment.