Skip to content

Commit

Permalink
Merge pull request #3779 from atlanhq/dev/janusoptimisation
Browse files Browse the repository at this point in the history
Dev/janusoptimisation
  • Loading branch information
aarshi0301 authored Nov 20, 2024
2 parents cf90e1c + 98a41e6 commit afe1a35
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 55 deletions.
1 change: 0 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ on:
- development
- master
- dg1908
- janus-optimisation

jobs:
build:
Expand Down
15 changes: 15 additions & 0 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,22 @@ public enum AtlasConfiguration {
HERACLES_API_SERVER_URL("atlas.heracles.api.service.url", "http://heracles-service.heracles.svc.cluster.local"),

INDEXSEARCH_ASYNC_SEARCH_KEEP_ALIVE_TIME_IN_SECONDS("atlas.indexsearch.async.search.keep.alive.time.in.seconds", 300),

/**
* hits elastic search async API
*/
ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false),

/***
* enables parallel processing of janus graph vertices from cassandra
*/
ENABLE_JANUS_GRAPH_OPTIMISATION("atlas.janus.graph.optimisation.enable", false),

/**
* No. of threads to be spawned for parallel processing
*/
THREADS_TO_BE_SPAWNED("atlas.janus.graph.optimisation.thread_count", (Runtime.getRuntime().availableProcessors())/2),
FETCH_COLLAPSED_RESULT("atlas.indexsearch.fetch.collapsed.result", true),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;
import java.util.*;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -84,6 +87,7 @@
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AtlasConfiguration.THREADS_TO_BE_SPAWNED.getInt()); // Use half of available cores

private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;
Expand Down Expand Up @@ -1002,7 +1006,7 @@ public AtlasSearchResult directIndexSearch(SearchParams searchParams) throws Atl
return null;
}
RequestContext.get().endMetricRecord(elasticSearchQueryMetric);
prepareSearchResult(ret, indexQueryResult, resultAttributes, true);
prepareSearchResult(ret, indexQueryResult, resultAttributes, AtlasConfiguration.FETCH_COLLAPSED_RESULT.getBoolean());

ret.setAggregations(indexQueryResult.getAggregationMap());
ret.setApproximateCount(indexQuery.vertexTotals());
Expand Down Expand Up @@ -1071,7 +1075,84 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
@SuppressWarnings("rawtypes")
private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
boolean showSearchScore = searchParams.getShowSearchScore();
List<Result> results = new ArrayList<>();

// Collect results for batch processing
Iterator<Result> iterator = indexQueryResult.getIterator();
while (iterator != null && iterator.hasNext()) {
results.add(iterator.next());
}

// Batch fetch vertices
List<AtlasVertex> vertices = results.stream()
.map(Result::getVertex)
.filter(Objects::nonNull)
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();

// Run vertex processing in limited parallel threads
CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("__guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
return header;
} catch (AtlasBaseException e) {
throw new RuntimeException("Failed to process vertex with GUID: " + guid, e);
}
});
}), CUSTOMTHREADPOOL).join();

// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;

String guid = vertex.getProperty("__guid", String.class);
AtlasEntityHeader header = headers.get(guid);

if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}

if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse;
try {
collapse = processCollapseResults(result, searchParams, resultAttributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}

if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}

if (header != null) {
entitiesSet.put(header.getGuid(), header);
}
});
ret.setEntities(new ArrayList<>(entitiesSet.values()));
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
Expand Down Expand Up @@ -1121,7 +1202,7 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
prepareSearchResultSync(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
Expand All @@ -1140,11 +1221,47 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
ret.addEntity(header);
}
} catch (Exception e) {
throw e;
throw e;
}
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) {
LOG.debug("enabled janusGraphOptimisation");
prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
}
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
}
// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);
Expand Down
Loading

0 comments on commit afe1a35

Please sign in to comment.