From 44394126fb7bbd4d9fd35b66bcea905a0bcd8cf4 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Wed, 13 Nov 2024 20:18:37 +0530 Subject: [PATCH 1/9] PLT-2568 optimise vertex fetch from janusgraph --- .../discovery/EntityDiscoveryService.java | 156 +++++++++++------- 1 file changed, 97 insertions(+), 59 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5c66f77e057..5ceafec5af0 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,6 +70,9 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -84,6 +87,8 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; + private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); + private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; @@ -1071,78 +1076,111 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } } + @SuppressWarnings("rawtypes") private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); - try { - if(LOG.isDebugEnabled()){ - LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); - } - Iterator iterator = indexQueryResult.getIterator(); - boolean showSearchScore = searchParams.getShowSearchScore(); - if (iterator == null) { - return; - } + boolean showSearchScore = searchParams.getShowSearchScore(); + List results = new ArrayList<>(); - while (iterator.hasNext()) { - Result result = iterator.next(); - AtlasVertex vertex = result.getVertex(); - - if (vertex == null) { - LOG.warn("vertex in null"); - continue; - } + // Collect results for batch processing + Iterator iterator = indexQueryResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + results.add(iterator.next()); + } - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if(RequestContext.get().includeClassifications()){ - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - if (showSearchScore) { - ret.addEntityScore(header.getGuid(), result.getScore()); - } - if (fetchCollapsedResults) { - Map collapse = new HashMap<>(); - - Set collapseKeys = result.getCollapseKeys(); - for (String collapseKey : collapseKeys) { - AtlasSearchResult collapseRet = new AtlasSearchResult(); - collapseRet.setSearchParameters(ret.getSearchParameters()); - - Set collapseResultAttributes = new HashSet<>(); - if (searchParams.getCollapseAttributes() != null) { - collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); - } else { - collapseResultAttributes = resultAttributes; + // Batch fetch vertices + List vertices = results.stream() + .map(Result::getVertex) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + // Use ConcurrentHashMap for thread-safe access + ConcurrentHashMap headers = new ConcurrentHashMap<>(); + + // Run vertex processing in limited parallel threads + CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> + vertices.parallelStream().forEach(vertex -> { + String guid = vertex.getProperty("guid", String.class); + headers.computeIfAbsent(guid, k -> { + try { + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if (RequestContext.get().includeClassifications()) { + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + return header; + } catch (AtlasBaseException e) { + throw new RuntimeException(e); } + }); + }) + ).join(), CUSTOMTHREADPOOL); - if (searchParams.getCollapseRelationAttributes() != null) { - RequestContext.get().getRelationAttrsForSearch().clear(); - RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); - } + // Process results and handle collapse in parallel + results.parallelStream().forEach(result -> { + AtlasVertex vertex = result.getVertex(); + if (vertex == null) return; - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); - prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); + String guid = vertex.getProperty("guid", String.class); + AtlasEntityHeader header = headers.get(guid); - collapseRet.setSearchParameters(null); - collapse.put(collapseKey, collapseRet); - } - if (!collapse.isEmpty()) { - header.setCollapse(collapse); - } + if (showSearchScore) { + ret.addEntityScore(header.getGuid(), result.getScore()); + } + + if (fetchCollapsedResults) { + Map collapse; + try { + collapse = processCollapseResults(result, searchParams, resultAttributes); + } catch (AtlasBaseException e) { + throw new RuntimeException(e); } - if (searchParams.getShowSearchMetadata()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - ret.addSort(header.getGuid(), result.getSort()); - } else if (searchParams.getShowHighlights()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); + if (!collapse.isEmpty()) { + header.setCollapse(collapse); } + } - ret.addEntity(header); + if (searchParams.getShowSearchMetadata()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addSort(header.getGuid(), result.getSort()); + } else if (searchParams.getShowHighlights()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); } - } catch (Exception e) { - throw e; + + ret.addEntity(header); + }); + + if (!searchParams.getEnableFullRestriction()) { + scrubSearchResults(ret, searchParams.getSuppressLogs()); } - scrubSearchResults(ret, searchParams.getSuppressLogs()); + } + + // Non-recursive collapse processing + private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { + Map collapse = new HashMap<>(); + Set collapseKeys = result.getCollapseKeys(); + + for (String collapseKey : collapseKeys) { + AtlasSearchResult collapseRet = new AtlasSearchResult(); + collapseRet.setSearchParameters(searchParams); + Set collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); + + // Directly iterate over collapse vertices + Iterator iterator = indexQueryCollapsedResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + Result collapseResult = iterator.next(); + AtlasVertex collapseVertex = collapseResult.getVertex(); + if (collapseVertex == null) continue; + + AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); + collapseRet.addEntity(collapseHeader); + } + + collapse.put(collapseKey, collapseRet); + } + + return collapse; } private Map getMap(String key, Object value) { From 4fc2f43e2a99b30f6231480d729664968fe3ca2f Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:22:44 +0530 Subject: [PATCH 2/9] Fix property name --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5ceafec5af0..9303ab9170d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1100,7 +1100,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i // Run vertex processing in limited parallel threads CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("guid", String.class); + String guid = vertex.getProperty("__guid", String.class); headers.computeIfAbsent(guid, k -> { try { AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); @@ -1120,7 +1120,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i AtlasVertex vertex = result.getVertex(); if (vertex == null) return; - String guid = vertex.getProperty("guid", String.class); + String guid = vertex.getProperty("__guid", String.class); AtlasEntityHeader header = headers.get(guid); if (showSearchScore) { From a90209633b7abc044112e14cfe75489ea3a31914 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:23:54 +0530 Subject: [PATCH 3/9] add null cehck --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 9303ab9170d..c5cc8019dc4 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1146,7 +1146,9 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i ret.addHighlights(header.getGuid(), result.getHighLights()); } - ret.addEntity(header); + if (header != null) { + ret.addEntity(header); + } }); if (!searchParams.getEnableFullRestriction()) { From f29738a35b5d79400b2430ff327ebde363be85ee Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 16:31:50 +0530 Subject: [PATCH 4/9] remove submit call --- .../discovery/EntityDiscoveryService.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index c5cc8019dc4..f7a76485c84 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1098,22 +1098,20 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i ConcurrentHashMap headers = new ConcurrentHashMap<>(); // Run vertex processing in limited parallel threads - CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() -> - vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("__guid", String.class); - headers.computeIfAbsent(guid, k -> { - try { - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if (RequestContext.get().includeClassifications()) { - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - return header; - } catch (AtlasBaseException e) { - throw new RuntimeException(e); - } - }); - }) - ).join(), CUSTOMTHREADPOOL); + CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { + String guid = vertex.getProperty("__guid", String.class); + headers.computeIfAbsent(guid, k -> { + try { + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if (RequestContext.get().includeClassifications()) { + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + return header; + } catch (AtlasBaseException e) { + throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); + } + }); + }), CUSTOMTHREADPOOL).join(); // Process results and handle collapse in parallel results.parallelStream().forEach(result -> { From 59bee7e843e5f802a6e5d2ec53fed068f8218334 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 17:36:05 +0530 Subject: [PATCH 5/9] handle cocurrent modification exception --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index f7a76485c84..1622cd3ea4c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1096,6 +1096,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i // Use ConcurrentHashMap for thread-safe access ConcurrentHashMap headers = new ConcurrentHashMap<>(); + ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); // Run vertex processing in limited parallel threads CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { @@ -1145,10 +1146,11 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i } if (header != null) { - ret.addEntity(header); + entitiesSet.put(header.getGuid(), header); } }); + ret.setEntities(new ArrayList<>(entitiesSet.values())); if (!searchParams.getEnableFullRestriction()) { scrubSearchResults(ret, searchParams.getSuppressLogs()); } From 7bb10377c256ea51de5e5bd644df3a40dcc4cccc Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 18:27:06 +0530 Subject: [PATCH 6/9] update libraries --- .../apache/atlas/model/discovery/AtlasSearchResult.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 96bc0dc5871..963e8506367 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -31,13 +31,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.LinkedHashMap; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; From 15cf5f6c82a26445d01962498d05c9aeae14df12 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 18:41:52 +0530 Subject: [PATCH 7/9] remove scrub results --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 1622cd3ea4c..7e9cbb9890d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1149,11 +1149,6 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i entitiesSet.put(header.getGuid(), header); } }); - - ret.setEntities(new ArrayList<>(entitiesSet.values())); - if (!searchParams.getEnableFullRestriction()) { - scrubSearchResults(ret, searchParams.getSuppressLogs()); - } } // Non-recursive collapse processing From b79aebd2ed0784aa315a09614b65bad882b0d6dc Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 19:38:44 +0530 Subject: [PATCH 8/9] add branch --- .github/workflows/maven.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index f8a09b55899..e697d640ab9 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - lineageondemand + - janusgraphOptimisation jobs: build: From 87745e84632887913da6f9507e868e430dad16e9 Mon Sep 17 00:00:00 2001 From: aarshi Date: Mon, 18 Nov 2024 20:11:17 +0530 Subject: [PATCH 9/9] undo workflow --- .github/workflows/maven.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index a8986bda5a4..90bc81e6d9a 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,7 +26,6 @@ on: - development - master - master - - lineageondemand jobs: