Skip to content

Commit

Permalink
Merge pull request #3759 from atlanhq/dev/janusgraphOptimisation
Browse files Browse the repository at this point in the history
JanusGraph optimisation
  • Loading branch information
aarshi0301 authored Nov 18, 2024
2 parents c533da8 + 87745e8 commit 1ff820d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 65 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ on:
- beta
- development
- master
- master


jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -84,6 +87,8 @@
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();
private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores

private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;
Expand Down Expand Up @@ -1071,78 +1076,108 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

@SuppressWarnings("rawtypes")
private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
LOG.debug("Preparing search results for ({})", ret.getSearchParameters());
}
Iterator<Result> iterator = indexQueryResult.getIterator();
boolean showSearchScore = searchParams.getShowSearchScore();
if (iterator == null) {
return;
}
boolean showSearchScore = searchParams.getShowSearchScore();
List<Result> results = new ArrayList<>();

while (iterator.hasNext()) {
Result result = iterator.next();
AtlasVertex vertex = result.getVertex();
// Collect results for batch processing
Iterator<Result> iterator = indexQueryResult.getIterator();
while (iterator != null && iterator.hasNext()) {
results.add(iterator.next());
}

if (vertex == null) {
LOG.warn("vertex in null");
continue;
// Batch fetch vertices
List<AtlasVertex> vertices = results.stream()
.map(Result::getVertex)
.filter(Objects::nonNull)
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();

// Run vertex processing in limited parallel threads
CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("__guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
return header;
} catch (AtlasBaseException e) {
throw new RuntimeException("Failed to process vertex with GUID: " + guid, e);
}
});
}), CUSTOMTHREADPOOL).join();

AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
}
// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;

if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}
String guid = vertex.getProperty("__guid", String.class);
AtlasEntityHeader header = headers.get(guid);

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse;
try {
collapse = processCollapseResults(result, searchParams, resultAttributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}

ret.addEntity(header);
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}
} catch (Exception e) {
throw e;

if (header != null) {
entitiesSet.put(header.getGuid(), header);
}
});
}

// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}
scrubSearchResults(ret, searchParams.getSuppressLogs());

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down

0 comments on commit 1ff820d

Please sign in to comment.