|
70 | 70 | import javax.script.ScriptEngine;
|
71 | 71 | import javax.script.ScriptException;
|
72 | 72 | import java.util.*;
|
73 |
| -import java.util.concurrent.CompletableFuture; |
74 |
| -import java.util.concurrent.ConcurrentHashMap; |
75 |
| -import java.util.concurrent.ForkJoinPool; |
76 | 73 | import java.util.stream.Collectors;
|
77 | 74 |
|
78 | 75 | import static org.apache.atlas.AtlasErrorCode.*;
|
|
87 | 84 | public class EntityDiscoveryService implements AtlasDiscoveryService {
|
88 | 85 | private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
|
89 | 86 | private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
|
90 |
| - private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors(); |
91 |
| - private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores |
92 | 87 |
|
93 | 88 | private final AtlasGraph graph;
|
94 | 89 | private final EntityGraphRetriever entityRetriever;
|
@@ -1076,108 +1071,78 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
|
1076 | 1071 | }
|
1077 | 1072 | }
|
1078 | 1073 |
|
1079 |
| - @SuppressWarnings("rawtypes") |
1080 | 1074 | private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
|
1081 | 1075 | SearchParams searchParams = ret.getSearchParameters();
|
1082 |
| - boolean showSearchScore = searchParams.getShowSearchScore(); |
1083 |
| - List<Result> results = new ArrayList<>(); |
| 1076 | + try { |
| 1077 | + if(LOG.isDebugEnabled()){ |
| 1078 | + LOG.debug("Preparing search results for ({})", ret.getSearchParameters()); |
| 1079 | + } |
| 1080 | + Iterator<Result> iterator = indexQueryResult.getIterator(); |
| 1081 | + boolean showSearchScore = searchParams.getShowSearchScore(); |
| 1082 | + if (iterator == null) { |
| 1083 | + return; |
| 1084 | + } |
1084 | 1085 |
|
1085 |
| - // Collect results for batch processing |
1086 |
| - Iterator<Result> iterator = indexQueryResult.getIterator(); |
1087 |
| - while (iterator != null && iterator.hasNext()) { |
1088 |
| - results.add(iterator.next()); |
1089 |
| - } |
| 1086 | + while (iterator.hasNext()) { |
| 1087 | + Result result = iterator.next(); |
| 1088 | + AtlasVertex vertex = result.getVertex(); |
1090 | 1089 |
|
1091 |
| - // Batch fetch vertices |
1092 |
| - List<AtlasVertex> vertices = results.stream() |
1093 |
| - .map(Result::getVertex) |
1094 |
| - .filter(Objects::nonNull) |
1095 |
| - .collect(Collectors.toList()); |
1096 |
| - |
1097 |
| - // Use ConcurrentHashMap for thread-safe access |
1098 |
| - ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>(); |
1099 |
| - ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>(); |
1100 |
| - |
1101 |
| - // Run vertex processing in limited parallel threads |
1102 |
| - CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { |
1103 |
| - String guid = vertex.getProperty("__guid", String.class); |
1104 |
| - headers.computeIfAbsent(guid, k -> { |
1105 |
| - try { |
1106 |
| - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); |
1107 |
| - if (RequestContext.get().includeClassifications()) { |
1108 |
| - header.setClassifications(entityRetriever.getAllClassifications(vertex)); |
1109 |
| - } |
1110 |
| - return header; |
1111 |
| - } catch (AtlasBaseException e) { |
1112 |
| - throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); |
| 1090 | + if (vertex == null) { |
| 1091 | + LOG.warn("vertex in null"); |
| 1092 | + continue; |
1113 | 1093 | }
|
1114 |
| - }); |
1115 |
| - }), CUSTOMTHREADPOOL).join(); |
1116 | 1094 |
|
1117 |
| - // Process results and handle collapse in parallel |
1118 |
| - results.parallelStream().forEach(result -> { |
1119 |
| - AtlasVertex vertex = result.getVertex(); |
1120 |
| - if (vertex == null) return; |
| 1095 | + AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); |
| 1096 | + if(RequestContext.get().includeClassifications()){ |
| 1097 | + header.setClassifications(entityRetriever.getAllClassifications(vertex)); |
| 1098 | + } |
| 1099 | + if (showSearchScore) { |
| 1100 | + ret.addEntityScore(header.getGuid(), result.getScore()); |
| 1101 | + } |
| 1102 | + if (fetchCollapsedResults) { |
| 1103 | + Map<String, AtlasSearchResult> collapse = new HashMap<>(); |
| 1104 | + |
| 1105 | + Set<String> collapseKeys = result.getCollapseKeys(); |
| 1106 | + for (String collapseKey : collapseKeys) { |
| 1107 | + AtlasSearchResult collapseRet = new AtlasSearchResult(); |
| 1108 | + collapseRet.setSearchParameters(ret.getSearchParameters()); |
| 1109 | + |
| 1110 | + Set<String> collapseResultAttributes = new HashSet<>(); |
| 1111 | + if (searchParams.getCollapseAttributes() != null) { |
| 1112 | + collapseResultAttributes.addAll(searchParams.getCollapseAttributes()); |
| 1113 | + } else { |
| 1114 | + collapseResultAttributes = resultAttributes; |
| 1115 | + } |
1121 | 1116 |
|
1122 |
| - String guid = vertex.getProperty("__guid", String.class); |
1123 |
| - AtlasEntityHeader header = headers.get(guid); |
| 1117 | + if (searchParams.getCollapseRelationAttributes() != null) { |
| 1118 | + RequestContext.get().getRelationAttrsForSearch().clear(); |
| 1119 | + RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes()); |
| 1120 | + } |
1124 | 1121 |
|
1125 |
| - if (showSearchScore) { |
1126 |
| - ret.addEntityScore(header.getGuid(), result.getScore()); |
1127 |
| - } |
| 1122 | + DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); |
| 1123 | + collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); |
| 1124 | + prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false); |
1128 | 1125 |
|
1129 |
| - if (fetchCollapsedResults) { |
1130 |
| - Map<String, AtlasSearchResult> collapse; |
1131 |
| - try { |
1132 |
| - collapse = processCollapseResults(result, searchParams, resultAttributes); |
1133 |
| - } catch (AtlasBaseException e) { |
1134 |
| - throw new RuntimeException(e); |
| 1126 | + collapseRet.setSearchParameters(null); |
| 1127 | + collapse.put(collapseKey, collapseRet); |
| 1128 | + } |
| 1129 | + if (!collapse.isEmpty()) { |
| 1130 | + header.setCollapse(collapse); |
| 1131 | + } |
1135 | 1132 | }
|
1136 |
| - if (!collapse.isEmpty()) { |
1137 |
| - header.setCollapse(collapse); |
| 1133 | + if (searchParams.getShowSearchMetadata()) { |
| 1134 | + ret.addHighlights(header.getGuid(), result.getHighLights()); |
| 1135 | + ret.addSort(header.getGuid(), result.getSort()); |
| 1136 | + } else if (searchParams.getShowHighlights()) { |
| 1137 | + ret.addHighlights(header.getGuid(), result.getHighLights()); |
1138 | 1138 | }
|
1139 |
| - } |
1140 | 1139 |
|
1141 |
| - if (searchParams.getShowSearchMetadata()) { |
1142 |
| - ret.addHighlights(header.getGuid(), result.getHighLights()); |
1143 |
| - ret.addSort(header.getGuid(), result.getSort()); |
1144 |
| - } else if (searchParams.getShowHighlights()) { |
1145 |
| - ret.addHighlights(header.getGuid(), result.getHighLights()); |
| 1140 | + ret.addEntity(header); |
1146 | 1141 | }
|
1147 |
| - |
1148 |
| - if (header != null) { |
1149 |
| - entitiesSet.put(header.getGuid(), header); |
1150 |
| - } |
1151 |
| - }); |
1152 |
| - } |
1153 |
| - |
1154 |
| - // Non-recursive collapse processing |
1155 |
| - private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException { |
1156 |
| - Map<String, AtlasSearchResult> collapse = new HashMap<>(); |
1157 |
| - Set<String> collapseKeys = result.getCollapseKeys(); |
1158 |
| - |
1159 |
| - for (String collapseKey : collapseKeys) { |
1160 |
| - AtlasSearchResult collapseRet = new AtlasSearchResult(); |
1161 |
| - collapseRet.setSearchParameters(searchParams); |
1162 |
| - Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); |
1163 |
| - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); |
1164 |
| - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); |
1165 |
| - |
1166 |
| - // Directly iterate over collapse vertices |
1167 |
| - Iterator<Result> iterator = indexQueryCollapsedResult.getIterator(); |
1168 |
| - while (iterator != null && iterator.hasNext()) { |
1169 |
| - Result collapseResult = iterator.next(); |
1170 |
| - AtlasVertex collapseVertex = collapseResult.getVertex(); |
1171 |
| - if (collapseVertex == null) continue; |
1172 |
| - |
1173 |
| - AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); |
1174 |
| - collapseRet.addEntity(collapseHeader); |
1175 |
| - } |
1176 |
| - |
1177 |
| - collapse.put(collapseKey, collapseRet); |
| 1142 | + } catch (Exception e) { |
| 1143 | + throw e; |
1178 | 1144 | }
|
1179 |
| - |
1180 |
| - return collapse; |
| 1145 | + scrubSearchResults(ret, searchParams.getSuppressLogs()); |
1181 | 1146 | }
|
1182 | 1147 |
|
1183 | 1148 | private Map<String, Object> getMap(String key, Object value) {
|
|
0 commit comments