From 6f1549dc44946043e3e459eb4802de03d317d6b9 Mon Sep 17 00:00:00 2001 From: aarshi Date: Thu, 21 Nov 2024 10:49:53 +0530 Subject: [PATCH 1/4] fix if condition --- .../org/apache/atlas/discovery/EntityDiscoveryService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 3f8155d9c1..561cc456eb 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -1230,8 +1230,9 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) { LOG.debug("enabled janusGraphOptimisation"); prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); + }else { + prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); } - prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); } // Non-recursive collapse processing private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { From 7feef651db9a8be898645da266bb4dd8ca45222a Mon Sep 17 00:00:00 2001 From: aarshi Date: Thu, 21 Nov 2024 12:17:45 +0530 Subject: [PATCH 2/4] remove branch deployment --- .github/workflows/maven.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 6227a0cf64..f8a09b5589 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,7 +26,6 @@ on: - development - master - lineageondemand - - janusoptimisation jobs: build: From bd1ddb085f738478d168d39ce18f19fe3d7777bf Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Thu, 21 Nov 2024 13:04:31 +0530 Subject: [PATCH 3/4] use multi query to improve response time of post elastic query lookups --- .../graphdb/janus/AtlasJanusGraph.java | 11 +- .../discovery/EntityDiscoveryService.java | 24 +-- .../audit/EntityAuditListenerV2.java | 2 +- .../store/graph/v2/EntityGraphRetriever.java | 176 ++++++++++++++++++ 4 files changed, 195 insertions(+), 18 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index 6c5e0c9886..9bd2fbcdcd 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -69,12 +69,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.janusgraph.core.Cardinality; -import org.janusgraph.core.JanusGraph; -import org.janusgraph.core.JanusGraphFactory; -import org.janusgraph.core.JanusGraphIndexQuery; -import org.janusgraph.core.PropertyKey; -import org.janusgraph.core.SchemaViolationException; +import org.janusgraph.core.*; import org.janusgraph.core.schema.JanusGraphIndex; import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.core.schema.Parameter; @@ -696,4 +691,8 @@ public void setEnableCache(boolean enableCache) { public Boolean isCacheEnabled() { return this.janusGraph.isCacheEnabled(); } + + public JanusGraphTransaction getTransaction() { + return this.janusGraph.newThreadBoundTransaction(); + } } diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index efa124a3fa..8694623f0a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -70,9 +70,7 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*; @@ -87,7 +85,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; - private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AtlasConfiguration.THREADS_TO_BE_SPAWNED.getInt()); // Use half of available cores private final AtlasGraph graph; private final EntityGraphRetriever entityRetriever; @@ -1094,15 +1091,19 @@ private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryRes .collect(Collectors.toList()); // Use ConcurrentHashMap for thread-safe access - ConcurrentHashMap headers = new ConcurrentHashMap<>(); - ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); + //ConcurrentHashMap headers = new ConcurrentHashMap<>(); + //ConcurrentHashMap entitiesSet = new ConcurrentHashMap<>(); + + List headers = entityRetriever.mapVerticesToAtlasEntityHeader(vertices, resultAttributes); + // Create a Set entitiesSet = headers.stream().collect(Collectors.toMap(AtlasEntityHeader::getGuid, Function.identity())); // Run vertex processing in limited parallel threads - CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { + /**CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> { String guid = vertex.getProperty("__guid", String.class); headers.computeIfAbsent(guid, k -> { try { - AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); + //AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes); if (RequestContext.get().includeClassifications()) { header.setClassifications(entityRetriever.getAllClassifications(vertex)); } @@ -1111,7 +1112,7 @@ private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryRes throw new RuntimeException("Failed to process vertex with GUID: " + guid, e); } }); - }), CUSTOMTHREADPOOL).join(); + }), CUSTOMTHREADPOOL).join();*/ // Process results and handle collapse in parallel results.parallelStream().forEach(result -> { @@ -1119,7 +1120,7 @@ private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryRes if (vertex == null) return; String guid = vertex.getProperty("__guid", String.class); - AtlasEntityHeader header = headers.get(guid); + AtlasEntityHeader header = entitiesSet.get(guid); if (showSearchScore) { ret.addEntityScore(header.getGuid(), result.getScore()); @@ -1230,8 +1231,9 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) { LOG.debug("enabled janusGraphOptimisation"); prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); + } else { + prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); } - prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults); } // Non-recursive collapse processing private Map processCollapseResults(Result result, SearchParams searchParams, Set resultAttributes) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index d0dbecad38..a0e9d6eed6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -493,7 +493,7 @@ private EntityAuditEventV2 createEvent(EntityAuditEventV2 entityAuditEventV2, At String qualifiedName = (String) originalEntity.getAttribute(QUALIFIED_NAME); entityAuditEventV2.setEntityQualifiedName(AtlasType.toJson(qualifiedName)); } else { - String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME); + String qualifiedName = ((List)entity.getAttribute(QUALIFIED_NAME)).get(0).toString(); entityAuditEventV2.setEntityQualifiedName(AtlasType.toJson(qualifiedName)); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 0f11a100fc..16053afab8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.store.graph.v2; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; @@ -49,6 +50,8 @@ import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; import org.apache.atlas.type.AtlasEntityType; @@ -67,6 +70,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.janusgraph.core.JanusGraphMultiVertexQuery; +import org.janusgraph.core.JanusGraphTransaction; +import org.janusgraph.core.JanusGraphVertex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -1176,6 +1182,176 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, } + public List mapVerticesToAtlasEntityHeader(List entityVertices, Set attributes) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVerticesToAtlasEntityHeader"); + + List results = new ArrayList<>(); + + try { + // Convert AtlasVertex to JanusGraphVertex + + // Use multiQuery for optimized property fetching + Map> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes); + Map map = getJanusGraphVerticesMap(entityVertices); + multiQueryResults.forEach((janusGraphVertex, vertexProperties) -> { + AtlasEntityHeader ret = new AtlasEntityHeader(); + + // Populate AtlasEntityHeader with fetched properties + try { + populateEntityHeader(ret, map.get(janusGraphVertex), vertexProperties, attributes); + } catch (AtlasBaseException e) { + throw new RuntimeException(e); + } + results.add(ret); + }); + + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + return results; + } + + private Map getJanusGraphVerticesMap(List vertices) { + Map resultMap = new HashMap<>(); + + for (AtlasVertex vertex : vertices) { + if (vertex instanceof AtlasJanusVertex) { + Object wrappedElement = ((AtlasJanusVertex) vertex).getWrappedElement(); + + if (wrappedElement instanceof JanusGraphVertex) { + resultMap.put((JanusGraphVertex) wrappedElement, vertex); + } else { + throw new IllegalArgumentException("Wrapped element is not an instance of JanusGraphVertex"); + } + } else { + throw new IllegalArgumentException("Provided vertex is not an instance of AtlasJanusVertex"); + } + } + + return resultMap; + } + + // Helper to convert AtlasVertex to JanusGraphVertex + private List getJanusGraphVertices(List vertices) { + List results = new ArrayList<>(); + for(AtlasVertex vertex : vertices) { + if (((AtlasJanusVertex) vertex).getWrappedElement() instanceof JanusGraphVertex) { + results.add(vertex.getWrappedElement()); + } else { + throw new IllegalArgumentException("Provided vertex is not an instance of JanusGraphVertex"); + } + } + return results; + } + + // Use multiQuery to batch-fetch properties + private Map> getBatchPropertiesWithMultiQuery(List entityVertices, Set attributes) { + Iterable vertices = getJanusGraphVertices(entityVertices); + List target = new ArrayList<>(); + vertices.forEach(target::add); + JanusGraphTransaction transaction = ((AtlasJanusGraph)graph).getTransaction(); + try { + JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(target); + + Set keys = Sets.newHashSet(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, + Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, + Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); + + keys.addAll(attributes); + + multiQuery.keys(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, + Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, + Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, STATE_PROPERTY_KEY); + + Map> vertexPropertiesMap = new HashMap<>(); + + for (JanusGraphVertex vertex : vertices) { + Map properties = new HashMap<>(); + for (String key : keys) { + properties.put(key, vertex.property(key).orElse(null)); + } + vertexPropertiesMap.put(vertex, properties); + } + return vertexPropertiesMap; + } finally { + /*if(transaction != null) { + transaction.commit(); + transaction.close(); + }*/ + } + + } + + // Populate AtlasEntityHeader + private void populateEntityHeader(AtlasEntityHeader ret, AtlasVertex entityVertex, Map vertexProperties, Set attributes) throws AtlasBaseException { + String typeName = (String) vertexProperties.get(Constants.TYPE_NAME_PROPERTY_KEY); + String guid = (String) vertexProperties.get(Constants.GUID_PROPERTY_KEY); + String createdBy = (String) vertexProperties.get(Constants.CREATED_BY_KEY); + String updatedBy = (String) vertexProperties.get(Constants.MODIFIED_BY_KEY); + Long createTime = (Long) vertexProperties.get(Constants.TIMESTAMP_PROPERTY_KEY); + Long updateTime = (Long) vertexProperties.get(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); + Boolean isIncomplete = isEntityIncomplete(entityVertex); + + ret.setTypeName(typeName); + ret.setGuid(guid); + ret.setStatus(GraphHelper.getStatus(entityVertex)); + ret.setIsIncomplete(isIncomplete); + ret.setCreatedBy(createdBy); + ret.setUpdatedBy(updatedBy); + ret.setCreateTime(createTime != null ? new Date(createTime) : null); + ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); + ret.setLabels(getLabels(entityVertex)); + + // Classifications + RequestContext context = RequestContext.get(); + if (context.includeClassifications() || context.isIncludeClassificationNames()) { + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); + } + + // Meanings + if (context.includeMeanings()) { + List termAssignmentHeaders = mapAssignedTerms(entityVertex); + ret.setMeanings(termAssignmentHeaders); + ret.setMeaningNames(termAssignmentHeaders.stream() + .map(AtlasTermAssignmentHeader::getDisplayText) + .collect(Collectors.toList())); + } + + // Process entity type and attributes + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + if (entityType != null) { + // Header attributes + for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { + Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute); + if (attrValue != null) { + ret.setAttribute(headerAttribute.getName(), attrValue); + } + } + + // Display text + Object displayText = getDisplayText(entityVertex, entityType); + if (displayText != null) { + ret.setDisplayText(displayText.toString()); + } + + // Additional attributes + if (CollectionUtils.isNotEmpty(attributes)) { + for (String attrName : attributes) { + AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName); + if (attribute != null) { + Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute); + if (attrValue != null) { + ret.setAttribute(attrName, attrValue); + } + } + } + } + } + + // Additional properties like classifications, meanings, and attributes... + } + private Object getCommonProperty(Map vertexProperties, String propertyName) { if (vertexProperties.get(propertyName) instanceof List) { return ((List) vertexProperties.get(propertyName)).get(0); From 650cfd16565ea87adbc5a87cf3c58cecef17214b Mon Sep 17 00:00:00 2001 From: aarshi Date: Thu, 21 Nov 2024 14:29:28 +0530 Subject: [PATCH 4/4] import function --- .../java/org/apache/atlas/discovery/EntityDiscoveryService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index fb932df7aa..29a33d676b 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -73,6 +73,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.*;