From 08c0df0f7adfbada8a9a99f54e599974fa869780 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Thu, 21 Nov 2024 18:48:05 +0530 Subject: [PATCH] revert to previous version of mapVertexToAtlasEntityHeader --- .../discovery/EntityDiscoveryService.java | 118 ------ .../store/graph/v2/EntityGraphRetriever.java | 357 ++---------------- 2 files changed, 37 insertions(+), 438 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 29a33d676b..905256da93 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,10 +70,6 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; -import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -1075,87 +1071,6 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro } } - @SuppressWarnings("rawtypes") - private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { - SearchParams searchParams = ret.getSearchParameters(); - boolean showSearchScore = searchParams.getShowSearchScore(); - List results = new ArrayList<>(); - - // Collect results for batch processing - Iterator iterator = indexQueryResult.getIterator(); - while (iterator != null && iterator.hasNext()) { - results.add(iterator.next()); - } - - // Batch fetch vertices - List vertices = results.stream() - .map(Result::getVertex) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - - // Use ConcurrentHashMap for thread-safe access - //ConcurrentHashMap headers = new ConcurrentHashMap<>(); - //ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); - - List headers = entityRetriever.mapVerticesToAtlasEntityHeader(vertices, resultAttributes); - // Create a Set entitiesSet = headers.stream().collect(Collectors.toMap(AtlasEntityHeader::getGuid, Function.identity())); - - // Run vertex processing in limited parallel threads - /**CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { - String guid = vertex.getProperty("__guid", String.class); - headers.computeIfAbsent(guid, k -> { - try { - //AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); - if (RequestContext.get().includeClassifications()) { - header.setClassifications(entityRetriever.getAllClassifications(vertex)); - } - return header; - } catch (AtlasBaseException e) { - throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); - } - }); - }), CUSTOMTHREADPOOL).join();*/ - - // Process results and handle collapse in parallel - results.parallelStream().forEach(result -> { - AtlasVertex vertex = result.getVertex(); - if (vertex == null) return; - - String guid = vertex.getProperty("__guid", String.class); - AtlasEntityHeader header = entitiesSet.get(guid); - - if (showSearchScore) { - ret.addEntityScore(header.getGuid(), result.getScore()); - } - - if (fetchCollapsedResults) { - Map collapse; - try { - collapse = processCollapseResults(result, searchParams, resultAttributes); - } catch (AtlasBaseException e) { - throw new RuntimeException(e); - } - if (!collapse.isEmpty()) { - header.setCollapse(collapse); - } - } - - if (searchParams.getShowSearchMetadata()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - ret.addSort(header.getGuid(), result.getSort()); - } else if (searchParams.getShowHighlights()) { - ret.addHighlights(header.getGuid(), result.getHighLights()); - } - - if (header != null) { - entitiesSet.put(header.getGuid(), header); - } - }); - ret.setEntities(new ArrayList<>(entitiesSet.values())); - scrubSearchResults(ret, searchParams.getSuppressLogs()); - } - private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { SearchParams searchParams = ret.getSearchParameters(); try { @@ -1231,40 +1146,7 @@ private void prepareSearchResultSync(AtlasSearchResult ret, DirectIndexQueryResu } private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException { - if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) { - LOG.debug("enabled janusGraphOptimisation"); - prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); - } else { prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); - } - } - // Non-recursive collapse processing - private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { - Map collapse = new HashMap<>(); - Set collapseKeys = result.getCollapseKeys(); - - for (String collapseKey : collapseKeys) { - AtlasSearchResult collapseRet = new AtlasSearchResult(); - collapseRet.setSearchParameters(searchParams); - Set collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes)); - DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey); - collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount()); - - // Directly iterate over collapse vertices - Iterator iterator = indexQueryCollapsedResult.getIterator(); - while (iterator != null && iterator.hasNext()) { - Result collapseResult = iterator.next(); - AtlasVertex collapseVertex = collapseResult.getVertex(); - if (collapseVertex == null) continue; - - AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes); - collapseRet.addEntity(collapseHeader); - } - - collapse.put(collapseKey, collapseRet); - } - - return collapse; } private Map getMap(String key, Object value) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 3782982bc9..3b8144fed3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -18,7 +18,6 @@ package org.apache.atlas.repository.store.graph.v2; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; @@ -50,8 +49,6 @@ import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; -import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; import org.apache.atlas.type.AtlasEntityType; @@ -70,9 +67,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; -import org.janusgraph.core.JanusGraphMultiVertexQuery; -import org.janusgraph.core.JanusGraphTransaction; -import org.janusgraph.core.JanusGraphVertex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -1015,330 +1009,77 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) return mapVertexToAtlasEntityHeader(entityVertex, Collections.emptySet()); } -// private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { -// AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader"); -// AtlasEntityHeader ret = new AtlasEntityHeader(); -// try { -// String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); -// String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); -// Boolean isIncomplete = isEntityIncomplete(entityVertex); -// -// ret.setTypeName(typeName); -// ret.setGuid(guid); -// ret.setStatus(GraphHelper.getStatus(entityVertex)); -// RequestContext context = RequestContext.get(); -// boolean includeClassifications = context.includeClassifications(); -// boolean includeClassificationNames = context.isIncludeClassificationNames(); -// if(includeClassifications){ -// ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); -// } else if (!includeClassifications && includeClassificationNames) { -// ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); -// } -// ret.setIsIncomplete(isIncomplete); -// ret.setLabels(getLabels(entityVertex)); -// -// ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); -// ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); -// ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex))); -// ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex))); -// -// if(RequestContext.get().includeMeanings()) { -// List termAssignmentHeaders = mapAssignedTerms(entityVertex); -// ret.setMeanings(termAssignmentHeaders); -// ret.setMeaningNames( -// termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText) -// .collect(Collectors.toList())); -// } -// AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); -// -// if (entityType != null) { -// for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { -// Object attrValue = getVertexAttribute(entityVertex, headerAttribute); -// -// if (attrValue != null) { -// ret.setAttribute(headerAttribute.getName(), attrValue); -// } -// } -// -// Object displayText = getDisplayText(entityVertex, entityType); -// -// if (displayText != null) { -// ret.setDisplayText(displayText.toString()); -// } -// -// if (CollectionUtils.isNotEmpty(attributes)) { -// for (String attrName : attributes) { -// AtlasAttribute attribute = entityType.getAttribute(attrName); -// -// if (attribute == null) { -// attrName = toNonQualifiedName(attrName); -// -// if (ret.hasAttribute(attrName)) { -// continue; -// } -// -// attribute = entityType.getAttribute(attrName); -// -// if (attribute == null) { -// attribute = entityType.getRelationshipAttribute(attrName, null); -// } -// } -// -// Object attrValue = getVertexAttribute(entityVertex, attribute); -// -// if (attrValue != null) { -// ret.setAttribute(attrName, attrValue); -// } -// } -// } -// } -// } -// finally { -// RequestContext.get().endMetricRecord(metricRecorder); -// } -// return ret; -// } - - private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVertexToAtlasEntityHeader"); - AtlasEntityHeader ret = new AtlasEntityHeader(); - try { - // Batch fetch all properties - Map vertexProperties = batchGetProperties(entityVertex); - - // Common properties - String typeName = (String) getCommonProperty(vertexProperties, Constants.TYPE_NAME_PROPERTY_KEY); - String guid = (String) getCommonProperty(vertexProperties, GUID_PROPERTY_KEY); - Long createTime = (Long) getCommonProperty(vertexProperties, TIMESTAMP_PROPERTY_KEY); - Long updateTime = (Long) getCommonProperty(vertexProperties, MODIFICATION_TIMESTAMP_PROPERTY_KEY); + String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); + String guid = entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); Boolean isIncomplete = isEntityIncomplete(entityVertex); - // Set properties in header ret.setTypeName(typeName); ret.setGuid(guid); ret.setStatus(GraphHelper.getStatus(entityVertex)); - ret.setIsIncomplete(isIncomplete); - ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); - ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); - ret.setCreateTime(createTime != null ? new Date(createTime) : null); - ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); - ret.setLabels(getLabels(entityVertex)); - - // Classifications RequestContext context = RequestContext.get(); - if (context.includeClassifications() || context.isIncludeClassificationNames()) { + boolean includeClassifications = context.includeClassifications(); + boolean includeClassificationNames = context.isIncludeClassificationNames(); + if(includeClassifications){ + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); + } else if (!includeClassifications && includeClassificationNames) { ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); } + ret.setIsIncomplete(isIncomplete); + ret.setLabels(getLabels(entityVertex)); - // Meanings - if (context.includeMeanings()) { + ret.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex)); + ret.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex)); + ret.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex))); + ret.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex))); + + if(RequestContext.get().includeMeanings()) { List termAssignmentHeaders = mapAssignedTerms(entityVertex); ret.setMeanings(termAssignmentHeaders); - ret.setMeaningNames(termAssignmentHeaders.stream() - .map(AtlasTermAssignmentHeader::getDisplayText) - .collect(Collectors.toList())); + ret.setMeaningNames( + termAssignmentHeaders.stream().map(AtlasTermAssignmentHeader::getDisplayText) + .collect(Collectors.toList())); } - - // Process entity type and attributes AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + if (entityType != null) { - // Header attributes for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { - Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute); + Object attrValue = getVertexAttribute(entityVertex, headerAttribute); + if (attrValue != null) { ret.setAttribute(headerAttribute.getName(), attrValue); } } - // Display text Object displayText = getDisplayText(entityVertex, entityType); + if (displayText != null) { ret.setDisplayText(displayText.toString()); } - // Additional attributes if (CollectionUtils.isNotEmpty(attributes)) { for (String attrName : attributes) { - AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName); - if (attribute != null) { - Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute); - if (attrValue != null) { - ret.setAttribute(attrName, attrValue); - } - } - } - } - } - } finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - - return ret; - } - - - public List mapVerticesToAtlasEntityHeader(List entityVertices, Set attributes) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVerticesToAtlasEntityHeader"); - - List results = new ArrayList<>(); - - try { - // Convert AtlasVertex to JanusGraphVertex - - // Use multiQuery for optimized property fetching - Map> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes); - Map map = getJanusGraphVerticesMap(entityVertices); - multiQueryResults.forEach((janusGraphVertex, vertexProperties) -> { - AtlasEntityHeader ret = new AtlasEntityHeader(); - - // Populate AtlasEntityHeader with fetched properties - try { - populateEntityHeader(ret, map.get(janusGraphVertex), vertexProperties, attributes); - } catch (AtlasBaseException e) { - throw new RuntimeException(e); - } - results.add(ret); - }); - - } finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - - return results; - } - - private Map getJanusGraphVerticesMap(List vertices) { - Map resultMap = new HashMap<>(); + AtlasAttribute attribute = entityType.getAttribute(attrName); - for (AtlasVertex vertex : vertices) { - if (vertex instanceof AtlasJanusVertex) { - Object wrappedElement = ((AtlasJanusVertex) vertex).getWrappedElement(); + if (attribute == null) { + attrName = toNonQualifiedName(attrName); - if (wrappedElement instanceof JanusGraphVertex) { - resultMap.put((JanusGraphVertex) wrappedElement, vertex); - } else { - throw new IllegalArgumentException("Wrapped element is not an instance of JanusGraphVertex"); - } - } else { - throw new IllegalArgumentException("Provided vertex is not an instance of AtlasJanusVertex"); - } - } - - return resultMap; - } - - // Helper to convert AtlasVertex to JanusGraphVertex - private List getJanusGraphVertices(List vertices) { - List results = new ArrayList<>(); - for(AtlasVertex vertex : vertices) { - if (((AtlasJanusVertex) vertex).getWrappedElement() instanceof JanusGraphVertex) { - results.add(vertex.getWrappedElement()); - } else { - throw new IllegalArgumentException("Provided vertex is not an instance of JanusGraphVertex"); - } - } - return results; - } - - // Use multiQuery to batch-fetch properties - private Map> getBatchPropertiesWithMultiQuery(List entityVertices, Set attributes) { - Iterable vertices = getJanusGraphVertices(entityVertices); - List target = new ArrayList<>(); - vertices.forEach(target::add); - JanusGraphTransaction transaction = ((AtlasJanusGraph)graph).getTransaction(); - try { - JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(target); - - Set keys = Sets.newHashSet(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, - Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, - Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); - - keys.addAll(attributes); - - multiQuery.keys(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, - Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, - Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, STATE_PROPERTY_KEY); + if (ret.hasAttribute(attrName)) { + continue; + } - Map> vertexPropertiesMap = new HashMap<>(); + attribute = entityType.getAttribute(attrName); - for (JanusGraphVertex vertex : vertices) { - Map properties = new HashMap<>(); - for (String key : keys) { - properties.put(key, vertex.property(key).orElse(null)); - } - vertexPropertiesMap.put(vertex, properties); - } - return vertexPropertiesMap; - } finally { - /*if(transaction != null) { - transaction.commit(); - transaction.close(); - }*/ - } - - } - - // Populate AtlasEntityHeader - private void populateEntityHeader(AtlasEntityHeader ret, AtlasVertex entityVertex, Map vertexProperties, Set attributes) throws AtlasBaseException { - String typeName = (String) vertexProperties.get(Constants.TYPE_NAME_PROPERTY_KEY); - String guid = (String) vertexProperties.get(Constants.GUID_PROPERTY_KEY); - String createdBy = (String) vertexProperties.get(Constants.CREATED_BY_KEY); - String updatedBy = (String) vertexProperties.get(Constants.MODIFIED_BY_KEY); - Long createTime = (Long) vertexProperties.get(Constants.TIMESTAMP_PROPERTY_KEY); - Long updateTime = (Long) vertexProperties.get(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); - Boolean isIncomplete = isEntityIncomplete(entityVertex); - - ret.setTypeName(typeName); - ret.setGuid(guid); - ret.setStatus(GraphHelper.getStatus(entityVertex)); - ret.setIsIncomplete(isIncomplete); - ret.setCreatedBy(createdBy); - ret.setUpdatedBy(updatedBy); - ret.setCreateTime(createTime != null ? new Date(createTime) : null); - ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); - ret.setLabels(getLabels(entityVertex)); - - // Classifications - RequestContext context = RequestContext.get(); - if (context.includeClassifications() || context.isIncludeClassificationNames()) { - ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); - } - - // Meanings - if (context.includeMeanings()) { - List termAssignmentHeaders = mapAssignedTerms(entityVertex); - ret.setMeanings(termAssignmentHeaders); - ret.setMeaningNames(termAssignmentHeaders.stream() - .map(AtlasTermAssignmentHeader::getDisplayText) - .collect(Collectors.toList())); - } - - // Process entity type and attributes - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - if (entityType != null) { - // Header attributes - for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { - Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute); - if (attrValue != null) { - ret.setAttribute(headerAttribute.getName(), attrValue); - } - } + if (attribute == null) { + attribute = entityType.getRelationshipAttribute(attrName, null); + } + } - // Display text - Object displayText = getDisplayText(entityVertex, entityType); - if (displayText != null) { - ret.setDisplayText(displayText.toString()); - } + Object attrValue = getVertexAttribute(entityVertex, attribute); - // Additional attributes - if (CollectionUtils.isNotEmpty(attributes)) { - for (String attrName : attributes) { - AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName); - if (attribute != null) { - Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute); if (attrValue != null) { ret.setAttribute(attrName, attrValue); } @@ -1346,34 +1087,10 @@ private void populateEntityHeader(AtlasEntityHeader ret, AtlasVertex entityVerte } } } - - // Additional properties like classifications, meanings, and attributes... - } - - private Object getCommonProperty(Map vertexProperties, String propertyName) { - if (vertexProperties.get(propertyName) instanceof List) { - return ((List) vertexProperties.get(propertyName)).get(0); - } - return new Object(); - } - /** - * Fetches all properties of a vertex in one call and returns them as a map. - */ - private Map batchGetProperties(AtlasVertex vertex) { - // Use JanusGraph's Gremlin API for efficient property retrieval - return (Map) graph.V(vertex.getId()).valueMap().next(); - } - - /** - * Retrieves a vertex attribute from the pre-fetched batch of properties. - */ - private Object getVertexAttributeFromBatch(Map properties, AtlasAttribute attribute) { - if (properties == null || attribute == null) { - return null; + finally { + RequestContext.get().endMetricRecord(metricRecorder); } - - String propertyKey = attribute.getVertexPropertyName(); - return properties.get(propertyKey); + return ret; } /**