Skip to content

Commit

Permalink
Merge pull request #3783 from atlanhq/revert-GraphRetrieverBeta
Browse files Browse the repository at this point in the history
PLT-2748 revert to previous version of mapVertexToAtlasEntityHeader
  • Loading branch information
sriram-atlan authored Nov 21, 2024
2 parents bbe483c + 08c0df0 commit 61ec4ae
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
Expand Down Expand Up @@ -1075,87 +1071,6 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

@SuppressWarnings("rawtypes")
private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
boolean showSearchScore = searchParams.getShowSearchScore();
List<Result> results = new ArrayList<>();

// Collect results for batch processing
Iterator<Result> iterator = indexQueryResult.getIterator();
while (iterator != null && iterator.hasNext()) {
results.add(iterator.next());
}

// Batch fetch vertices
List<AtlasVertex> vertices = results.stream()
.map(Result::getVertex)
.filter(Objects::nonNull)
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
//ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
//ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();

List<AtlasEntityHeader> headers = entityRetriever.mapVerticesToAtlasEntityHeader(vertices, resultAttributes);
// Create a Set<String, AltasEntityHeader based on the GUID
Map<String, AtlasEntityHeader> entitiesSet = headers.stream().collect(Collectors.toMap(AtlasEntityHeader::getGuid, Function.identity()));

// Run vertex processing in limited parallel threads
/**CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("__guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
//AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
return header;
} catch (AtlasBaseException e) {
throw new RuntimeException("Failed to process vertex with GUID: " + guid, e);
}
});
}), CUSTOMTHREADPOOL).join();*/

// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;

String guid = vertex.getProperty("__guid", String.class);
AtlasEntityHeader header = entitiesSet.get(guid);

if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}

if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse;
try {
collapse = processCollapseResults(result, searchParams, resultAttributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}

if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}

if (header != null) {
entitiesSet.put(header.getGuid(), header);
}
});
ret.setEntities(new ArrayList<>(entitiesSet.values()));
scrubSearchResults(ret, searchParams.getSuppressLogs());
}

private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
Expand Down Expand Up @@ -1231,40 +1146,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu
}

private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) {
LOG.debug("enabled janusGraphOptimisation");
prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
} else {
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
}
}
// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Expand Down
Loading

0 comments on commit 61ec4ae

Please sign in to comment.