From 364c4aa20edbcf59a57a47d5c7803029519ac633 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Wed, 13 Nov 2024 20:18:37 +0530 Subject: [PATCH 01/21] PLT-2568 optimise vertex fetch from janusgraph --- .../discovery/EntityDiscoveryService.java | 156 +++++++++++------- 1 file changed, 97 insertions(+), 59 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5c66f77e05..5ceafec5af 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,6 +70,9 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -84,6 +87,8 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; + private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); + private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; @@ -1071,78 +1076,111 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } } + @SuppressWarnings("rawtypes") private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); - try { - if(LOG.isDebugEnabled()){ - LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); - } - Iterator iterator = indexQueryResult.getIterator(); - boolean showSearchScore = searchParams.getShowSearchScore(); - if (iterator == null) { - return; - } + boolean showSearchScore = searchParams.getShowSearchScore(); + List results = new ArrayList<>(); - while (iterator.hasNext()) { - Result result = iterator.next(); - AtlasVertex vertex = result.getVertex(); - - if (vertex == null) { - LOG.warn("vertex in null"); - continue; - } + // Collect results for batch processing + Iterator iterator = indexQueryResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + results.add(iterator.next()); + } - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if(RequestContext.get().includeClassifications()){ - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - if (showSearchScore) { - ret.addEntityScore(header.getGuid(), result.getScore()); - } - if (fetchCollapsedResults) { - Map collapse = new HashMap<>(); - - Set collapseKeys = result.getCollapseKeys(); - for (String collapseKey : collapseKeys) { - AtlasSearchResult collapseRet = new AtlasSearchResult(); - collapseRet.setSearchParameters(ret.getSearchParameters()); - - Set collapseResultAttributes = new HashSet<>(); - if (searchParams.getCollapseAttributes() != null) { - collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); - } else { - collapseResultAttributes = resultAttributes; + // Batch fetch vertices + List vertices = results.stream() + .map(Result::getVertex) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + // Use ConcurrentHashMap for thread-safe access + ConcurrentHashMap headers = new ConcurrentHashMap<>(); + + // Run vertex processing in limited parallel threads + CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> + vertices.parallelStream().forEach(vertex -> { + String guid = vertex.getProperty("guid", String.class); + headers.computeIfAbsent(guid, k -> { + try { + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if (RequestContext.get().includeClassifications()) { + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + return header; + } catch (AtlasBaseException e) { + throw new RuntimeException(e); } + }); + }) + ).join(), CUSTOMTHREADPOOL); - if (searchParams.getCollapseRelationAttributes() != null) { - RequestContext.get().getRelationAttrsForSearch().clear(); - RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); - } + // Process results and handle collapse in parallel + results.parallelStream().forEach(result -> { + AtlasVertex vertex = result.getVertex(); + if (vertex == null) return; - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); - prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); + String guid = vertex.getProperty("guid", String.class); + AtlasEntityHeader header = headers.get(guid); - collapseRet.setSearchParameters(null); - collapse.put(collapseKey, collapseRet); - } - if (!collapse.isEmpty()) { - header.setCollapse(collapse); - } + if (showSearchScore) { + ret.addEntityScore(header.getGuid(), result.getScore()); + } + + if (fetchCollapsedResults) { + Map collapse; + try { + collapse = processCollapseResults(result, searchParams, resultAttributes); + } catch (AtlasBaseException e) { + throw new RuntimeException(e); } - if (searchParams.getShowSearchMetadata()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - ret.addSort(header.getGuid(), result.getSort()); - } else if (searchParams.getShowHighlights()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); + if (!collapse.isEmpty()) { + header.setCollapse(collapse); } + } - ret.addEntity(header); + if (searchParams.getShowSearchMetadata()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addSort(header.getGuid(), result.getSort()); + } else if (searchParams.getShowHighlights()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); } - } catch (Exception e) { - throw e; + + ret.addEntity(header); + }); + + if (!searchParams.getEnableFullRestriction()) { + scrubSearchResults(ret, searchParams.getSuppressLogs()); } - scrubSearchResults(ret, searchParams.getSuppressLogs()); + } + + // Non-recursive collapse processing + private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { + Map collapse = new HashMap<>(); + Set collapseKeys = result.getCollapseKeys(); + + for (String collapseKey : collapseKeys) { + AtlasSearchResult collapseRet = new AtlasSearchResult(); + collapseRet.setSearchParameters(searchParams); + Set collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); + + // Directly iterate over collapse vertices + Iterator iterator = indexQueryCollapsedResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + Result collapseResult = iterator.next(); + AtlasVertex collapseVertex = collapseResult.getVertex(); + if (collapseVertex == null) continue; + + AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); + collapseRet.addEntity(collapseHeader); + } + + collapse.put(collapseKey, collapseRet); + } + + return collapse; } private Map getMap(String key, Object value) { From 8c7ca544b2e93e9832b56db07037c97692920816 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 08:16:33 +0530 Subject: [PATCH 02/21] remove method --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5ceafec5af..6d0374c7ea 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1148,10 +1148,6 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i ret.addEntity(header); }); - - if (!searchParams.getEnableFullRestriction()) { - scrubSearchResults(ret, searchParams.getSuppressLogs()); - } } // Non-recursive collapse processing From 3df42f5c1ec76811c94e30112986940dedacc845 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:22:44 +0530 Subject: [PATCH 03/21] Fix property name --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 6d0374c7ea..93d89a4640 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1100,7 +1100,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i // Run vertex processing in limited parallel threads CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("guid", String.class); + String guid = vertex.getProperty("__guid", String.class); headers.computeIfAbsent(guid, k -> { try { AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); @@ -1120,7 +1120,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i AtlasVertex vertex = result.getVertex(); if (vertex == null) return; - String guid = vertex.getProperty("guid", String.class); + String guid = vertex.getProperty("__guid", String.class); AtlasEntityHeader header = headers.get(guid); if (showSearchScore) { From 0bb62fca5c9305bb4e7583eac8dbea2efd2255be Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:23:54 +0530 Subject: [PATCH 04/21] add null cehck --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 93d89a4640..94ff39616a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1146,7 +1146,9 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i ret.addHighlights(header.getGuid(), result.getHighLights()); } - ret.addEntity(header); + if (header != null) { + ret.addEntity(header); + } }); } From 9b966a61859a6a861c0d75c4f007cc48e644f0ae Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:31:50 +0530 Subject: [PATCH 05/21] remove submit call --- .../discovery/EntityDiscoveryService.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 94ff39616a..669dc3ea3a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1098,22 +1098,20 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i ConcurrentHashMap headers = new ConcurrentHashMap<>(); // Run vertex processing in limited parallel threads - CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> - vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("__guid", String.class); - headers.computeIfAbsent(guid, k -> { - try { - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if (RequestContext.get().includeClassifications()) { - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - return header; - } catch (AtlasBaseException e) { - throw new RuntimeException(e); - } - }); - }) - ).join(), CUSTOMTHREADPOOL); + CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { + String guid = vertex.getProperty("__guid", String.class); + headers.computeIfAbsent(guid, k -> { + try { + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if (RequestContext.get().includeClassifications()) { + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + return header; + } catch (AtlasBaseException e) { + throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); + } + }); + }), CUSTOMTHREADPOOL).join(); // Process results and handle collapse in parallel results.parallelStream().forEach(result -> { From b0530b36b73774e3ff566112faa3e4a262aa6237 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 17:36:05 +0530 Subject: [PATCH 06/21] handle cocurrent modification exception --- .../apache/atlas/discovery/EntityDiscoveryService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 669dc3ea3a..1622cd3ea4 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1096,6 +1096,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i // Use ConcurrentHashMap for thread-safe access ConcurrentHashMap headers = new ConcurrentHashMap<>(); + ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); // Run vertex processing in limited parallel threads CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { @@ -1145,9 +1146,14 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i } if (header != null) { - ret.addEntity(header); + entitiesSet.put(header.getGuid(), header); } }); + + ret.setEntities(new ArrayList<>(entitiesSet.values())); + if (!searchParams.getEnableFullRestriction()) { + scrubSearchResults(ret, searchParams.getSuppressLogs()); + } } // Non-recursive collapse processing From 52e645dbfbde073e0c6c1532b992203eb915c56f Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 18:27:06 +0530 Subject: [PATCH 07/21] update libraries --- .../apache/atlas/model/discovery/AtlasSearchResult.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 96bc0dc587..963e850636 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -31,13 +31,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.LinkedHashMap; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; From c323ad700f4b258015b07867c9ad43999fe575cc Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 18:41:52 +0530 Subject: [PATCH 08/21] remove scrub results --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 1622cd3ea4..7e9cbb9890 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1149,11 +1149,6 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i entitiesSet.put(header.getGuid(), header); } }); - - ret.setEntities(new ArrayList<>(entitiesSet.values())); - if (!searchParams.getEnableFullRestriction()) { - scrubSearchResults(ret, searchParams.getSuppressLogs()); - } } // Non-recursive collapse processing From 9741f59812ef42cb5ee7fc0928e5f52a765b1229 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 09:14:25 +0530 Subject: [PATCH 09/21] fix results --- .../java/org/apache/atlas/discovery/EntityDiscoveryService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 7e9cbb9890..9f2fd920ef 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1149,6 +1149,8 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i entitiesSet.put(header.getGuid(), header); } }); + + ret.setEntities(new ArrayList<>(entitiesSet.values())); } // Non-recursive collapse processing From 15451640f30b32b5cdb4ce0dd991ae44d2ca1ec6 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 10:40:41 +0530 Subject: [PATCH 10/21] add dummy commit --- .../java/org/apache/atlas/discovery/EntityDiscoveryService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 9f2fd920ef..d69c0cae4c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1149,7 +1149,6 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i entitiesSet.put(header.getGuid(), header); } }); - ret.setEntities(new ArrayList<>(entitiesSet.values())); } From 953bce5bfce5ff0ddb385cabfe2d353ccdc1d9e9 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 11:01:08 +0530 Subject: [PATCH 11/21] add branch --- .github/workflows/maven.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b5589..bfc6835693 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - lineageondemand + - janus-optimisation jobs: build: From c6c284bcf7a85f6b28d9140c5ec6588b9cf9dcef Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 11:09:02 +0530 Subject: [PATCH 12/21] add janus branch --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index bfc6835693..6227a0cf64 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,7 +26,7 @@ on: - development - master - lineageondemand - - janus-optimisation + - janusoptimisation jobs: build: From d27ea4dcb9a5f4a22875175ec9497565437e65a5 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 17:52:14 +0530 Subject: [PATCH 13/21] scrub results --- .../java/org/apache/atlas/discovery/EntityDiscoveryService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index d69c0cae4c..cfdac220a3 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1150,6 +1150,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i } }); ret.setEntities(new ArrayList<>(entitiesSet.values())); + scrubSearchResults(ret, searchParams.getSuppressLogs()); } // Non-recursive collapse processing From 0015ef7762eb9ea519b921584b144839d306ad42 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 18:13:18 +0530 Subject: [PATCH 14/21] add flag for janusgraph optimisation --- .../main/java/org/apache/atlas/AtlasConfiguration.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index df2bca7860..388914946c 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -113,7 +113,16 @@ public enum AtlasConfiguration { HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"), INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), + + /** + * hits elastic search async APU + */ ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false), + + /*** + * enables parallel processing of janus graph vertices from cassandra + */ + ENABLE_JANUS_GRAPH_OPTIMISATION("atlas.janus.graph.optimisation.enable", false), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), From 54103c8a1341ae8699a6b84c2460352b08a84857 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 18:14:05 +0530 Subject: [PATCH 15/21] add flag while retrieving from cassandra --- .../discovery/EntityDiscoveryService.java | 83 ++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index cfdac220a3..bba024bcb4 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1077,7 +1077,7 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } @SuppressWarnings("rawtypes") - private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { + private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); boolean showSearchScore = searchParams.getShowSearchScore(); List results = new ArrayList<>(); @@ -1153,6 +1153,87 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i scrubSearchResults(ret, searchParams.getSuppressLogs()); } + private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { + SearchParams searchParams = ret.getSearchParameters(); + try { + if(LOG.isDebugEnabled()){ + LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); + } + Iterator iterator = indexQueryResult.getIterator(); + boolean showSearchScore = searchParams.getShowSearchScore(); + if (iterator == null) { + return; + } + + while (iterator.hasNext()) { + Result result = iterator.next(); + AtlasVertex vertex = result.getVertex(); + + if (vertex == null) { + LOG.warn("vertex in null"); + continue; + } + + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if(RequestContext.get().includeClassifications()){ + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + if (showSearchScore) { + ret.addEntityScore(header.getGuid(), result.getScore()); + } + if (fetchCollapsedResults) { + Map collapse = new HashMap<>(); + + Set collapseKeys = result.getCollapseKeys(); + for (String collapseKey : collapseKeys) { + AtlasSearchResult collapseRet = new AtlasSearchResult(); + collapseRet.setSearchParameters(ret.getSearchParameters()); + + Set collapseResultAttributes = new HashSet<>(); + if (searchParams.getCollapseAttributes() != null) { + collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); + } else { + collapseResultAttributes = resultAttributes; + } + + if (searchParams.getCollapseRelationAttributes() != null) { + RequestContext.get().getRelationAttrsForSearch().clear(); + RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); + } + + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); + prepareSearchResultSync(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); + + collapseRet.setSearchParameters(null); + collapse.put(collapseKey, collapseRet); + } + if (!collapse.isEmpty()) { + header.setCollapse(collapse); + } + } + if (searchParams.getShowSearchMetadata()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addSort(header.getGuid(), result.getSort()); + } else if (searchParams.getShowHighlights()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + } + + ret.addEntity(header); + } + } catch (Exception e) { + throw e; + } + scrubSearchResults(ret, searchParams.getSuppressLogs()); + } + + private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { + if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) { + LOG.debug("enabled janusGraphOptimisation"); + prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); + } + prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); + } // Non-recursive collapse processing private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { Map collapse = new HashMap<>(); From b903c7b602178b8cc37d84a01a878b330b78b233 Mon Sep 17 00:00:00 2001 From: aarshi Date: Tue, 19 Nov 2024 19:40:29 +0530 Subject: [PATCH 16/21] log ES query error --- .../atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java index dc46861b12..fb6fe85271 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasElasticsearchQuery.java @@ -431,7 +431,7 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla LOG.warn(String.format("ES index with name %s not found", index)); throw new AtlasBaseException(INDEX_NOT_FOUND, index); } else { - throw new AtlasBaseException(rex); + throw new AtlasBaseException(String.format("Error in executing elastic query: %s", EntityUtils.toString(entity)), rex); } } From dc7765771f714fff69430a6946cad921a250f672 Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 20 Nov 2024 15:42:00 +0530 Subject: [PATCH 17/21] fix map vertex method --- .../store/graph/v2/EntityGraphRetriever.java | 212 ++++++++++++++---- 1 file changed, 169 insertions(+), 43 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 34d3036042..cb4f515b8b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -1009,90 +1009,216 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) return mapVertexToAtlasEntityHeader(entityVertex, Collections.emptySet()); } +// private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { +// AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader"); +// AtlasEntityHeader ret = new AtlasEntityHeader(); +// try { +// String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); +// String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); +// Boolean isIncomplete = isEntityIncomplete(entityVertex); +// +// ret.setTypeName(typeName); +// ret.setGuid(guid); +// ret.setStatus(GraphHelper.getStatus(entityVertex)); +// RequestContext context = RequestContext.get(); +// boolean includeClassifications = context.includeClassifications(); +// boolean includeClassificationNames = context.isIncludeClassificationNames(); +// if(includeClassifications){ +// ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); +// } else if (!includeClassifications && includeClassificationNames) { +// ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); +// } +// ret.setIsIncomplete(isIncomplete); +// ret.setLabels(getLabels(entityVertex)); +// +// ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); +// ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); +// ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex))); +// ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex))); +// +// if(RequestContext.get().includeMeanings()) { +// List termAssignmentHeaders = mapAssignedTerms(entityVertex); +// ret.setMeanings(termAssignmentHeaders); +// ret.setMeaningNames( +// termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText) +// .collect(Collectors.toList())); +// } +// AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); +// +// if (entityType != null) { +// for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { +// Object attrValue = getVertexAttribute(entityVertex, headerAttribute); +// +// if (attrValue != null) { +// ret.setAttribute(headerAttribute.getName(), attrValue); +// } +// } +// +// Object displayText = getDisplayText(entityVertex, entityType); +// +// if (displayText != null) { +// ret.setDisplayText(displayText.toString()); +// } +// +// if (CollectionUtils.isNotEmpty(attributes)) { +// for (String attrName : attributes) { +// AtlasAttribute attribute = entityType.getAttribute(attrName); +// +// if (attribute == null) { +// attrName = toNonQualifiedName(attrName); +// +// if (ret.hasAttribute(attrName)) { +// continue; +// } +// +// attribute = entityType.getAttribute(attrName); +// +// if (attribute == null) { +// attribute = entityType.getRelationshipAttribute(attrName, null); +// } +// } +// +// Object attrValue = getVertexAttribute(entityVertex, attribute); +// +// if (attrValue != null) { +// ret.setAttribute(attrName, attrValue); +// } +// } +// } +// } +// } +// finally { +// RequestContext.get().endMetricRecord(metricRecorder); +// } +// return ret; +// } + + private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader"); + AtlasEntityHeader ret = new AtlasEntityHeader(); + try { - String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); - String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); + // Batch fetch all properties + Map vertexProperties = batchGetProperties(entityVertex); + + // Common properties + String typeName = (String) getCommonProperty(vertexProperties, Constants.TYPE_NAME_PROPERTY_KEY); + String guid = (String) getCommonProperty(vertexProperties, GUID_PROPERTY_KEY); + String createdBy = (String) getCommonProperty(vertexProperties, CREATED_BY_KEY); + String updatedBy = (String) getCommonProperty(vertexProperties, MODIFIED_BY_KEY); + Long createTime = (Long) getCommonProperty(vertexProperties, TIMESTAMP_PROPERTY_KEY); + Long updateTime = (Long) getCommonProperty(vertexProperties, MODIFICATION_TIMESTAMP_PROPERTY_KEY); Boolean isIncomplete = isEntityIncomplete(entityVertex); + // Set properties in header ret.setTypeName(typeName); ret.setGuid(guid); ret.setStatus(GraphHelper.getStatus(entityVertex)); - RequestContext context = RequestContext.get(); - boolean includeClassifications = context.includeClassifications(); - boolean includeClassificationNames = context.isIncludeClassificationNames(); - if(includeClassifications){ - ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); - } else if (!includeClassifications && includeClassificationNames) { - ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); - } ret.setIsIncomplete(isIncomplete); + ret.setCreatedBy(createdBy); + ret.setUpdatedBy(updatedBy); + ret.setCreateTime(createTime != null ? new Date(createTime) : null); + ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); ret.setLabels(getLabels(entityVertex)); - ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); - ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); - ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex))); - ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex))); + // Classifications + RequestContext context = RequestContext.get(); + if (context.includeClassifications() || context.isIncludeClassificationNames()) { + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); + } - if(RequestContext.get().includeMeanings()) { + // Meanings + if (context.includeMeanings()) { List termAssignmentHeaders = mapAssignedTerms(entityVertex); ret.setMeanings(termAssignmentHeaders); - ret.setMeaningNames( - termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText) - .collect(Collectors.toList())); + ret.setMeaningNames(termAssignmentHeaders.stream() + .map(AtlasTermAssignmentHeader::getDisplayText) + .collect(Collectors.toList())); } - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + // Process entity type and attributes + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); if (entityType != null) { + // Header attributes for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { - Object attrValue = getVertexAttribute(entityVertex, headerAttribute); - + Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute); if (attrValue != null) { ret.setAttribute(headerAttribute.getName(), attrValue); } } + // Display text Object displayText = getDisplayText(entityVertex, entityType); - if (displayText != null) { ret.setDisplayText(displayText.toString()); } + // Additional attributes if (CollectionUtils.isNotEmpty(attributes)) { for (String attrName : attributes) { - AtlasAttribute attribute = entityType.getAttribute(attrName); - - if (attribute == null) { - attrName = toNonQualifiedName(attrName); - - if (ret.hasAttribute(attrName)) { - continue; + AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName); + if (attribute != null) { + Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute); + if (attrValue != null) { + ret.setAttribute(attrName, attrValue); } - - attribute = entityType.getAttribute(attrName); - - if (attribute == null) { - attribute = entityType.getRelationshipAttribute(attrName, null); - } - } - - Object attrValue = getVertexAttribute(entityVertex, attribute); - - if (attrValue != null) { - ret.setAttribute(attrName, attrValue); } } } } - } - finally { + } finally { RequestContext.get().endMetricRecord(metricRecorder); } + return ret; } + + private Object getCommonProperty(Map vertexProperties, String propertyName) { + if (vertexProperties.get(propertyName) instanceof List) { + return ((List) vertexProperties.get(propertyName)).get(0); + } + return ""; + } + /** + * Fetches all properties of a vertex in one call and returns them as a map. + */ + private Map batchGetProperties(AtlasVertex vertex) { + // Use JanusGraph's Gremlin API for efficient property retrieval + return (Map) graph.V(vertex.getId()).valueMap().next(); + } + + /** + * Retrieves a vertex attribute from the pre-fetched batch of properties. + */ + private Object getVertexAttributeFromBatch(Map properties, AtlasAttribute attribute) { + if (properties == null || attribute == null) { + return null; + } + + String propertyKey = attribute.getVertexPropertyName(); + return properties.get(propertyKey); + } + + /** + * Retrieves an entity or relationship attribute from the entity type. + */ + private AtlasAttribute getEntityOrRelationshipAttribute(AtlasEntityType entityType, String attrName) { + AtlasAttribute attribute = entityType.getAttribute(attrName); + + if (attribute == null) { + attrName = toNonQualifiedName(attrName); + + attribute = entityType.getAttribute(attrName); + if (attribute == null) { + attribute = entityType.getRelationshipAttribute(attrName, null); + } + } + + return attribute; + } private String toNonQualifiedName(String attrName) { String ret; if (attrName.contains(".")) { From ec4a32d1c7b780b91da9f83a7a3cbae0fca8c427 Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 20 Nov 2024 15:52:06 +0530 Subject: [PATCH 18/21] add configuration in flag --- .../src/main/java/org/apache/atlas/AtlasConfiguration.java | 7 ++++++- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 3 +-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 388914946c..3e931b4d67 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -115,7 +115,7 @@ public enum AtlasConfiguration { INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300), /** - * hits elastic search async APU + * hits elastic search async API */ ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false), @@ -123,6 +123,11 @@ public enum AtlasConfiguration { * enables parallel processing of janus graph vertices from cassandra */ ENABLE_JANUS_GRAPH_OPTIMISATION("atlas.janus.graph.optimisation.enable", false), + + /** + * No. of threads to be spawned for parallel processing + */ + THREADS_TO_BE_SPAWNED("atlas.janus.graph.optimisation.thread_count", (Runtime.getRuntime().availableProcessors())/2), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index bba024bcb4..efa124a3fa 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -87,8 +87,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; - private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); - private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores + private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AtlasConfiguration.THREADS_TO_BE_SPAWNED.getInt()); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; From f51f8f678ef7fb30bfdf6cc10ecd08b8eba942e0 Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 20 Nov 2024 15:56:30 +0530 Subject: [PATCH 19/21] fix return type --- .../atlas/repository/store/graph/v2/EntityGraphRetriever.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index cb4f515b8b..0f11a100fc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -1180,7 +1180,7 @@ private Object getCommonProperty(Map vertexProperties, String pr if (vertexProperties.get(propertyName) instanceof List) { return ((List) vertexProperties.get(propertyName)).get(0); } - return ""; + return new Object(); } /** * Fetches all properties of a vertex in one call and returns them as a map. From 391e510ea0bf632ce0c3b0a91177a7b8059cc872 Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 20 Nov 2024 17:50:15 +0530 Subject: [PATCH 20/21] add flag for collapsed result --- intg/src/main/java/org/apache/atlas/AtlasConfiguration.java | 1 + .../java/org/apache/atlas/discovery/EntityDiscoveryService.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 3e931b4d67..9390f3959a 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -128,6 +128,7 @@ public enum AtlasConfiguration { * No. of threads to be spawned for parallel processing */ THREADS_TO_BE_SPAWNED("atlas.janus.graph.optimisation.thread_count", (Runtime.getRuntime().availableProcessors())/2), + FETCH_COLLAPSED_RESULT("atlas.indexsearch.fetch.collapsed.result", true), ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000), ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""), ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false), diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index efa124a3fa..3f8155d9c1 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1006,7 +1006,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl return null; } RequestContext.get().endMetricRecord(elasticSearchQueryMetric); - prepareSearchResult(ret, indexQueryResult, resultAttributes, true); + prepareSearchResult(ret, indexQueryResult, resultAttributes, AtlasConfiguration.FETCH_COLLAPSED_RESULT.getBoolean()); ret.setAggregations(indexQueryResult.getAggregationMap()); ret.setApproximateCount(indexQuery.vertexTotals()); From 875138795c9189fccb53b25b6f0e56e227cbb03e Mon Sep 17 00:00:00 2001 From: aarshi Date: Wed, 20 Nov 2024 18:10:21 +0530 Subject: [PATCH 21/21] fix updated by and createdby field --- .../repository/store/graph/v2/EntityGraphRetriever.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 0f11a100fc..bb2c3bf071 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -1106,8 +1106,6 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, // Common properties String typeName = (String) getCommonProperty(vertexProperties, Constants.TYPE_NAME_PROPERTY_KEY); String guid = (String) getCommonProperty(vertexProperties, GUID_PROPERTY_KEY); - String createdBy = (String) getCommonProperty(vertexProperties, CREATED_BY_KEY); - String updatedBy = (String) getCommonProperty(vertexProperties, MODIFIED_BY_KEY); Long createTime = (Long) getCommonProperty(vertexProperties, TIMESTAMP_PROPERTY_KEY); Long updateTime = (Long) getCommonProperty(vertexProperties, MODIFICATION_TIMESTAMP_PROPERTY_KEY); Boolean isIncomplete = isEntityIncomplete(entityVertex); @@ -1117,8 +1115,8 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, ret.setGuid(guid); ret.setStatus(GraphHelper.getStatus(entityVertex)); ret.setIsIncomplete(isIncomplete); - ret.setCreatedBy(createdBy); - ret.setUpdatedBy(updatedBy); + ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); + ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); ret.setCreateTime(createTime != null ? new Date(createTime) : null); ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); ret.setLabels(getLabels(entityVertex));