diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 562a6d9322d..90bc81e6d9a 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -25,6 +25,8 @@ on: - beta - development - master + - master + jobs: build: diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 96bc0dc5871..963e8506367 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -31,13 +31,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.LinkedHashMap; +import java.util.*; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5c66f77e057..7e9cbb9890d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,6 +70,9 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -84,6 +87,8 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; + private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); + private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; @@ -1071,78 +1076,108 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } } + @SuppressWarnings("rawtypes") private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); - try { - if(LOG.isDebugEnabled()){ - LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); - } - Iterator iterator = indexQueryResult.getIterator(); - boolean showSearchScore = searchParams.getShowSearchScore(); - if (iterator == null) { - return; - } + boolean showSearchScore = searchParams.getShowSearchScore(); + List results = new ArrayList<>(); - while (iterator.hasNext()) { - Result result = iterator.next(); - AtlasVertex vertex = result.getVertex(); + // Collect results for batch processing + Iterator iterator = indexQueryResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + results.add(iterator.next()); + } - if (vertex == null) { - LOG.warn("vertex in null"); - continue; + // Batch fetch vertices + List vertices = results.stream() + .map(Result::getVertex) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + // Use ConcurrentHashMap for thread-safe access + ConcurrentHashMap headers = new ConcurrentHashMap<>(); + ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); + + // Run vertex processing in limited parallel threads + CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { + String guid = vertex.getProperty("__guid", String.class); + headers.computeIfAbsent(guid, k -> { + try { + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + if (RequestContext.get().includeClassifications()) { + header.setClassifications(entityRetriever.getAllClassifications(vertex)); + } + return header; + } catch (AtlasBaseException e) { + throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); } + }); + }), CUSTOMTHREADPOOL).join(); - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if(RequestContext.get().includeClassifications()){ - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - if (showSearchScore) { - ret.addEntityScore(header.getGuid(), result.getScore()); - } - if (fetchCollapsedResults) { - Map collapse = new HashMap<>(); - - Set collapseKeys = result.getCollapseKeys(); - for (String collapseKey : collapseKeys) { - AtlasSearchResult collapseRet = new AtlasSearchResult(); - collapseRet.setSearchParameters(ret.getSearchParameters()); - - Set collapseResultAttributes = new HashSet<>(); - if (searchParams.getCollapseAttributes() != null) { - collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); - } else { - collapseResultAttributes = resultAttributes; - } + // Process results and handle collapse in parallel + results.parallelStream().forEach(result -> { + AtlasVertex vertex = result.getVertex(); + if (vertex == null) return; - if (searchParams.getCollapseRelationAttributes() != null) { - RequestContext.get().getRelationAttrsForSearch().clear(); - RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); - } + String guid = vertex.getProperty("__guid", String.class); + AtlasEntityHeader header = headers.get(guid); - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); - prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); + if (showSearchScore) { + ret.addEntityScore(header.getGuid(), result.getScore()); + } - collapseRet.setSearchParameters(null); - collapse.put(collapseKey, collapseRet); - } - if (!collapse.isEmpty()) { - header.setCollapse(collapse); - } + if (fetchCollapsedResults) { + Map collapse; + try { + collapse = processCollapseResults(result, searchParams, resultAttributes); + } catch (AtlasBaseException e) { + throw new RuntimeException(e); } - if (searchParams.getShowSearchMetadata()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - ret.addSort(header.getGuid(), result.getSort()); - } else if (searchParams.getShowHighlights()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); + if (!collapse.isEmpty()) { + header.setCollapse(collapse); } + } - ret.addEntity(header); + if (searchParams.getShowSearchMetadata()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); + ret.addSort(header.getGuid(), result.getSort()); + } else if (searchParams.getShowHighlights()) { + ret.addHighlights(header.getGuid(), result.getHighLights()); } - } catch (Exception e) { - throw e; + + if (header != null) { + entitiesSet.put(header.getGuid(), header); + } + }); + } + + // Non-recursive collapse processing + private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { + Map collapse = new HashMap<>(); + Set collapseKeys = result.getCollapseKeys(); + + for (String collapseKey : collapseKeys) { + AtlasSearchResult collapseRet = new AtlasSearchResult(); + collapseRet.setSearchParameters(searchParams); + Set collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); + + // Directly iterate over collapse vertices + Iterator iterator = indexQueryCollapsedResult.getIterator(); + while (iterator != null && iterator.hasNext()) { + Result collapseResult = iterator.next(); + AtlasVertex collapseVertex = collapseResult.getVertex(); + if (collapseVertex == null) continue; + + AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); + collapseRet.addEntity(collapseHeader); + } + + collapse.put(collapseKey, collapseRet); } - scrubSearchResults(ret, searchParams.getSuppressLogs()); + + return collapse; } private Map getMap(String key, Object value) {