Skip to content

Commit

Permalink
Merge pull request #3758 from atlanhq/revert-3757-janusgraphOptimisation
Browse files Browse the repository at this point in the history
NOC-87266 | Revert : Janusgraph optimisation"
  • Loading branch information
aarshi0301 authored Nov 18, 2024
2 parents be38450 + 6f5c1cf commit 236228b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -87,8 +84,6 @@
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();
private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores

private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;
Expand Down Expand Up @@ -1076,108 +1071,78 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

@SuppressWarnings("rawtypes")
private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
boolean showSearchScore = searchParams.getShowSearchScore();
List<Result> results = new ArrayList<>();
try {
if(LOG.isDebugEnabled()){
LOG.debug("Preparing search results for ({})", ret.getSearchParameters());
}
Iterator<Result> iterator = indexQueryResult.getIterator();
boolean showSearchScore = searchParams.getShowSearchScore();
if (iterator == null) {
return;
}

// Collect results for batch processing
Iterator<Result> iterator = indexQueryResult.getIterator();
while (iterator != null && iterator.hasNext()) {
results.add(iterator.next());
}
while (iterator.hasNext()) {
Result result = iterator.next();
AtlasVertex vertex = result.getVertex();

// Batch fetch vertices
List<AtlasVertex> vertices = results.stream()
.map(Result::getVertex)
.filter(Objects::nonNull)
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();

// Run vertex processing in limited parallel threads
CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("__guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
return header;
} catch (AtlasBaseException e) {
throw new RuntimeException("Failed to process vertex with GUID: " + guid, e);
if (vertex == null) {
LOG.warn("vertex in null");
continue;
}
});
}), CUSTOMTHREADPOOL).join();

// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
}

String guid = vertex.getProperty("__guid", String.class);
AtlasEntityHeader header = headers.get(guid);
if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}

if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse;
try {
collapse = processCollapseResults(result, searchParams, resultAttributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}
}

if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addEntity(header);
}

if (header != null) {
entitiesSet.put(header.getGuid(), header);
}
});
}

// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
} catch (Exception e) {
throw e;
}

return collapse;
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down

0 comments on commit 236228b

Please sign in to comment.