Skip to content

Commit

Permalink
master merge fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rashi-j committed Nov 15, 2024
2 parents dccc2f9 + e928462 commit 3ab829f
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,9 @@ public final class Constants {
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";
public static final String TASK_HEADER_ATLAN_VIA_UI = "x-atlan-via-ui";
public static final String TASK_HEADER_ATLAN_PKG_NAME = "x-atlan-package-name";
public static final String TASK_HEADER_ATLAN_AGENT_WORKFLOW_ID = "x-atlan-agent-workflow-id";
public static final String TASK_HEADER_ATLAN_VIA_UI = "x-atlan-via-ui";
public static final String TASK_HEADER_ATLAN_REQUEST_ID = "x-atlan-request-id";
public static final String TASK_HEADER_ATLAN_GOOGLE_SHEETS_ID = "x-atlan-google-sheets-id";
public static final String TASK_HEADER_ATLAN_MS_EXCEL_ID = "x-atlan-microsoft-excel-id";
Expand All @@ -377,7 +377,7 @@ public final class Constants {
add(TASK_HEADER_ATLAN_GOOGLE_SHEETS_ID);
add(TASK_HEADER_ATLAN_MS_EXCEL_ID);
}};

/**
* Index Recovery vertex property keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar
DirectIndexQueryResult result = null;

try {
if(searchParams.isCallAsync()) {
if(searchParams.isCallAsync() || AtlasConfiguration.ENABLE_ASYNC_INDEXSEARCH.getBoolean()) {
return performAsyncDirectIndexQuery(searchParams);
} else{
String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
Expand Down Expand Up @@ -184,7 +184,12 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
// then we need to delete the previous search context async
processRequestWithSameSearchContextId(searchParams);
}
AsyncQueryResult response = submitAsyncSearch(searchParams, false).get();

String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}
AsyncQueryResult response = submitAsyncSearch(searchParams, KeepAliveTime, false).get();
if(response.isRunning()) {
/*
* If the response is still running, then we need to wait for the response
Expand All @@ -200,8 +205,11 @@ private DirectIndexQueryResult performAsyncDirectIndexQuery(SearchParams searchP
}
response = getAsyncSearchResponse(searchParams, esSearchId).get();
if (response == null) {
// Return null, if the response is null wil help returning @204 HTTP_NO_CONTENT to the user
return null;
// Rather than null (if the response is null wil help returning @204 HTTP_NO_CONTENT to the user)
// return timeout exception to user
LOG.error("timeout exceeded for query {}:", searchParams.getQuery());
RequestContext.get().endMetricRecord(RequestContext.get().startMetricRecord("elasticQueryTimeout"));
throw new AtlasBaseException(AtlasErrorCode.INDEX_SEARCH_FAILED_DUE_TO_TIMEOUT, KeepAliveTime);
}
result = getResultFromResponse(response.getFullResponse(), true);
} else {
Expand Down Expand Up @@ -349,14 +357,10 @@ public void onFailure(Exception exception) {
lowLevelRestClient.performRequestAsync(request, responseListener);
}

private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, boolean source) {
private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, String KeepAliveTime, boolean source) {
CompletableFuture<AsyncQueryResult> future = new CompletableFuture<>();
HttpEntity entity = new NStringEntity(searchParams.getQuery(), ContentType.APPLICATION_JSON);
String endPoint;
String KeepAliveTime = AtlasConfiguration.INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS.getLong() +"s";
if (searchParams.getRequestTimeoutInSecs() != null) {
KeepAliveTime = searchParams.getRequestTimeoutInSecs() +"s";
}

if (source) {
endPoint = index + "/_async_search";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public enum AtlasConfiguration {
HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"),

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),
ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
Expand Down
2 changes: 2 additions & 0 deletions intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public enum AtlasErrorCode {
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"),
REPAIR_INDEX_FAILED(500, "ATLAS-500-00-018", "Error occurred while repairing indices: {0}"),
INDEX_SEARCH_FAILED(400, "ATLAS-400-00-102", "Error occurred while running direct index query on ES: {0}"),

INDEX_SEARCH_FAILED_DUE_TO_TIMEOUT(429, "ATLAS-400-00-502", "ES query exceeded timeout: {0}"),
DEPRECATED_API(400, "ATLAS-400-00-103", "Deprecated API. Use {0} instead"),
DISABLED_API(400, "ATLAS-400-00-104", "API temporarily disabled. Reason: {0}"),
HAS_LINEAGE_GET_EDGE_FAILED(500, "ATLAS-500-00-019", "Error occurred while getting edge between vertices for hasLineage migration: {0}"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,9 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createCommonVertexIndex(management, TASK_END_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_AGENT, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_AGENT_ID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_VIA_UI, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_PKG_NAME, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_AGENT_WORKFLOW_ID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_VIA_UI, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_REQUEST_ID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_GOOGLE_SHEETS_ID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_HEADER_ATLAN_MS_EXCEL_ID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ public void setBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType enti
}

if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
updateModificationMetadata(entityVertex);
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}

Expand Down Expand Up @@ -823,6 +824,7 @@ public void addOrUpdateBusinessAttributes(AtlasVertex entityVertex, AtlasEntityT
}

if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
updateModificationMetadata(entityVertex);
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}

Expand Down Expand Up @@ -875,6 +877,7 @@ public void removeBusinessAttributes(AtlasVertex entityVertex, AtlasEntityType e
}

if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
updateModificationMetadata(entityVertex);
entityChangeNotifier.onBusinessAttributesUpdated(AtlasGraphUtilsV2.getIdFromVertex(entityVertex), updatedBusinessAttributes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ public AtlasVertex createTaskVertex(AtlasTask task) {
}
}
}

setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
Expand Down

0 comments on commit 3ab829f

Please sign in to comment.