diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 963e8506367..96bc0dc5871 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -31,7 +31,13 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.LinkedHashMap; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 7e9cbb9890d..5c66f77e057 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,9 +70,6 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -87,8 +84,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; - private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); - private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; @@ -1076,108 +1071,78 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } } - @SuppressWarnings("rawtypes") private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); - boolean showSearchScore = searchParams.getShowSearchScore(); - List results = new ArrayList<>(); + try { + if(LOG.isDebugEnabled()){ + LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); + } + Iterator iterator = indexQueryResult.getIterator(); + boolean showSearchScore = searchParams.getShowSearchScore(); + if (iterator == null) { + return; + } - // Collect results for batch processing - Iterator iterator = indexQueryResult.getIterator(); - while (iterator != null && iterator.hasNext()) { - results.add(iterator.next()); - } + while (iterator.hasNext()) { + Result result = iterator.next(); + AtlasVertex vertex = result.getVertex(); - // Batch fetch vertices - List vertices = results.stream() - .map(Result::getVertex) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - // Use ConcurrentHashMap for thread-safe access - ConcurrentHashMap headers = new ConcurrentHashMap<>(); - ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); - - // Run vertex processing in limited parallel threads - CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("__guid", String.class); - headers.computeIfAbsent(guid, k -> { - try { - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if (RequestContext.get().includeClassifications()) { - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - return header; - } catch (AtlasBaseException e) { - throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); + if (vertex == null) { + LOG.warn("vertex in null"); + continue; } - }); - }), CUSTOMTHREADPOOL).join(); - // Process results and handle collapse in parallel - results.parallelStream().forEach(result -> { - AtlasVertex vertex = result.getVertex(); - if (vertex == null) return; + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if(RequestContext.get().includeClassifications()){ + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + if (showSearchScore) { + ret.addEntityScore(header.getGuid(), result.getScore()); + } + if (fetchCollapsedResults) { + Map collapse = new HashMap<>(); + + Set collapseKeys = result.getCollapseKeys(); + for (String collapseKey : collapseKeys) { + AtlasSearchResult collapseRet = new AtlasSearchResult(); + collapseRet.setSearchParameters(ret.getSearchParameters()); + + Set collapseResultAttributes = new HashSet<>(); + if (searchParams.getCollapseAttributes() != null) { + collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); + } else { + collapseResultAttributes = resultAttributes; + } - String guid = vertex.getProperty("__guid", String.class); - AtlasEntityHeader header = headers.get(guid); + if (searchParams.getCollapseRelationAttributes() != null) { + RequestContext.get().getRelationAttrsForSearch().clear(); + RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); + } - if (showSearchScore) { - ret.addEntityScore(header.getGuid(), result.getScore()); - } + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); + prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); - if (fetchCollapsedResults) { - Map collapse; - try { - collapse = processCollapseResults(result, searchParams, resultAttributes); - } catch (AtlasBaseException e) { - throw new RuntimeException(e); + collapseRet.setSearchParameters(null); + collapse.put(collapseKey, collapseRet); + } + if (!collapse.isEmpty()) { + header.setCollapse(collapse); + } } - if (!collapse.isEmpty()) { - header.setCollapse(collapse); + if (searchParams.getShowSearchMetadata()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addSort(header.getGuid(), result.getSort()); + } else if (searchParams.getShowHighlights()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); } - } - if (searchParams.getShowSearchMetadata()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - ret.addSort(header.getGuid(), result.getSort()); - } else if (searchParams.getShowHighlights()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addEntity(header); } - - if (header != null) { - entitiesSet.put(header.getGuid(), header); - } - }); - } - - // Non-recursive collapse processing - private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { - Map collapse = new HashMap<>(); - Set collapseKeys = result.getCollapseKeys(); - - for (String collapseKey : collapseKeys) { - AtlasSearchResult collapseRet = new AtlasSearchResult(); - collapseRet.setSearchParameters(searchParams); - Set collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); - - // Directly iterate over collapse vertices - Iterator iterator = indexQueryCollapsedResult.getIterator(); - while (iterator != null && iterator.hasNext()) { - Result collapseResult = iterator.next(); - AtlasVertex collapseVertex = collapseResult.getVertex(); - if (collapseVertex == null) continue; - - AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); - collapseRet.addEntity(collapseHeader); - } - - collapse.put(collapseKey, collapseRet); + } catch (Exception e) { + throw e; } - - return collapse; + scrubSearchResults(ret, searchParams.getSuppressLogs()); } private Map getMap(String key, Object value) {