Skip to content

Commit

Permalink
Merge pull request #3781 from atlanhq/dev/janusoptimisation
Browse files Browse the repository at this point in the history
Dev/janusoptimisation
  • Loading branch information
aarshi0301 authored Nov 21, 2024
2 parents ceb2919 + 25cfd6a commit bbe483c
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.JanusGraphIndexQuery;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.SchemaViolationException;
import org.janusgraph.core.*;
import org.janusgraph.core.schema.JanusGraphIndex;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.Parameter;
Expand Down Expand Up @@ -696,4 +691,8 @@ public void setEnableCache(boolean enableCache) {
public Boolean isCacheEnabled() {
return this.janusGraph.isCacheEnabled();
}

public JanusGraphTransaction getTransaction() {
return this.janusGraph.newThreadBoundTransaction();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
Expand All @@ -87,7 +88,6 @@
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AtlasConfiguration.THREADS_TO_BE_SPAWNED.getInt()); // Use half of available cores

private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;
Expand Down Expand Up @@ -1094,15 +1094,19 @@ private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryRes
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();
//ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();
//ConcurrentHashMap<String, AtlasEntityHeader> entitiesSet = new ConcurrentHashMap<>();

List<AtlasEntityHeader> headers = entityRetriever.mapVerticesToAtlasEntityHeader(vertices, resultAttributes);
// Create a Set<String, AltasEntityHeader based on the GUID
Map<String, AtlasEntityHeader> entitiesSet = headers.stream().collect(Collectors.toMap(AtlasEntityHeader::getGuid, Function.identity()));

// Run vertex processing in limited parallel threads
CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
/**CompletableFuture.runAsync(() -> vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("__guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
//AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
Expand All @@ -1111,15 +1115,15 @@ private void prepareSearchResultAsync(AtlasSearchResult ret, DirectIndexQueryRes
throw new RuntimeException("Failed to process vertex with GUID: " + guid, e);
}
});
}), CUSTOMTHREADPOOL).join();
}), CUSTOMTHREADPOOL).join();*/

// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;

String guid = vertex.getProperty("__guid", String.class);
AtlasEntityHeader header = headers.get(guid);
AtlasEntityHeader header = entitiesSet.get(guid);

if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
Expand Down Expand Up @@ -1230,8 +1234,9 @@ private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult i
if (AtlasConfiguration.ENABLE_JANUS_GRAPH_OPTIMISATION.getBoolean()) {
LOG.debug("enabled janusGraphOptimisation");
prepareSearchResultAsync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
} else {
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
}
prepareSearchResultSync(ret, indexQueryResult, resultAttributes, fetchCollapsedResults);
}
// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ private EntityAuditEventV2 createEvent(EntityAuditEventV2 entityAuditEventV2, At
String qualifiedName = (String) originalEntity.getAttribute(QUALIFIED_NAME);
entityAuditEventV2.setEntityQualifiedName(AtlasType.toJson(qualifiedName));
} else {
String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME);
String qualifiedName = ((List)entity.getAttribute(QUALIFIED_NAME)).get(0).toString();
entityAuditEventV2.setEntityQualifiedName(AtlasType.toJson(qualifiedName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
Expand Down Expand Up @@ -49,6 +50,8 @@
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph;
import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasEntityType;
Expand All @@ -67,6 +70,9 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.janusgraph.core.JanusGraphMultiVertexQuery;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -1174,6 +1180,176 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
}


public List<AtlasEntityHeader> mapVerticesToAtlasEntityHeader(List<AtlasVertex> entityVertices, Set<String> attributes) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVerticesToAtlasEntityHeader");

List<AtlasEntityHeader> results = new ArrayList<>();

try {
// Convert AtlasVertex to JanusGraphVertex

// Use multiQuery for optimized property fetching
Map<JanusGraphVertex, Map<String, Object>> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes);
Map<JanusGraphVertex, AtlasVertex> map = getJanusGraphVerticesMap(entityVertices);
multiQueryResults.forEach((janusGraphVertex, vertexProperties) -> {
AtlasEntityHeader ret = new AtlasEntityHeader();

// Populate AtlasEntityHeader with fetched properties
try {
populateEntityHeader(ret, map.get(janusGraphVertex), vertexProperties, attributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
results.add(ret);
});

} finally {
RequestContext.get().endMetricRecord(metricRecorder);
}

return results;
}

private Map<JanusGraphVertex, AtlasVertex> getJanusGraphVerticesMap(List<AtlasVertex> vertices) {
Map<JanusGraphVertex, AtlasVertex> resultMap = new HashMap<>();

for (AtlasVertex vertex : vertices) {
if (vertex instanceof AtlasJanusVertex) {
Object wrappedElement = ((AtlasJanusVertex) vertex).getWrappedElement();

if (wrappedElement instanceof JanusGraphVertex) {
resultMap.put((JanusGraphVertex) wrappedElement, vertex);
} else {
throw new IllegalArgumentException("Wrapped element is not an instance of JanusGraphVertex");
}
} else {
throw new IllegalArgumentException("Provided vertex is not an instance of AtlasJanusVertex");
}
}

return resultMap;
}

// Helper to convert AtlasVertex to JanusGraphVertex
private List<JanusGraphVertex> getJanusGraphVertices(List<AtlasVertex> vertices) {
List<JanusGraphVertex> results = new ArrayList<>();
for(AtlasVertex vertex : vertices) {
if (((AtlasJanusVertex) vertex).getWrappedElement() instanceof JanusGraphVertex) {
results.add(vertex.getWrappedElement());
} else {
throw new IllegalArgumentException("Provided vertex is not an instance of JanusGraphVertex");
}
}
return results;
}

// Use multiQuery to batch-fetch properties
private Map<JanusGraphVertex, Map<String, Object>> getBatchPropertiesWithMultiQuery(List<AtlasVertex> entityVertices, Set<String> attributes) {
Iterable<JanusGraphVertex> vertices = getJanusGraphVertices(entityVertices);
List<JanusGraphVertex> target = new ArrayList<>();
vertices.forEach(target::add);
JanusGraphTransaction transaction = ((AtlasJanusGraph)graph).getTransaction();
try {
JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(target);

Set<String> keys = Sets.newHashSet(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY,
Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY,
Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY);

keys.addAll(attributes);

multiQuery.keys(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY,
Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY,
Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, STATE_PROPERTY_KEY);

Map<JanusGraphVertex, Map<String, Object>> vertexPropertiesMap = new HashMap<>();

for (JanusGraphVertex vertex : vertices) {
Map<String, Object> properties = new HashMap<>();
for (String key : keys) {
properties.put(key, vertex.property(key).orElse(null));
}
vertexPropertiesMap.put(vertex, properties);
}
return vertexPropertiesMap;
} finally {
/*if(transaction != null) {
transaction.commit();
transaction.close();
}*/
}

}

// Populate AtlasEntityHeader
private void populateEntityHeader(AtlasEntityHeader ret, AtlasVertex entityVertex, Map<String, Object> vertexProperties, Set<String> attributes) throws AtlasBaseException {
String typeName = (String) vertexProperties.get(Constants.TYPE_NAME_PROPERTY_KEY);
String guid = (String) vertexProperties.get(Constants.GUID_PROPERTY_KEY);
String createdBy = (String) vertexProperties.get(Constants.CREATED_BY_KEY);
String updatedBy = (String) vertexProperties.get(Constants.MODIFIED_BY_KEY);
Long createTime = (Long) vertexProperties.get(Constants.TIMESTAMP_PROPERTY_KEY);
Long updateTime = (Long) vertexProperties.get(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY);
Boolean isIncomplete = isEntityIncomplete(entityVertex);

ret.setTypeName(typeName);
ret.setGuid(guid);
ret.setStatus(GraphHelper.getStatus(entityVertex));
ret.setIsIncomplete(isIncomplete);
ret.setCreatedBy(createdBy);
ret.setUpdatedBy(updatedBy);
ret.setCreateTime(createTime != null ? new Date(createTime) : null);
ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null);
ret.setLabels(getLabels(entityVertex));

// Classifications
RequestContext context = RequestContext.get();
if (context.includeClassifications() || context.isIncludeClassificationNames()) {
ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex));
}

// Meanings
if (context.includeMeanings()) {
List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
ret.setMeaningNames(termAssignmentHeaders.stream()
.map(AtlasTermAssignmentHeader::getDisplayText)
.collect(Collectors.toList()));
}

// Process entity type and attributes
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
if (entityType != null) {
// Header attributes
for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) {
Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute);
if (attrValue != null) {
ret.setAttribute(headerAttribute.getName(), attrValue);
}
}

// Display text
Object displayText = getDisplayText(entityVertex, entityType);
if (displayText != null) {
ret.setDisplayText(displayText.toString());
}

// Additional attributes
if (CollectionUtils.isNotEmpty(attributes)) {
for (String attrName : attributes) {
AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName);
if (attribute != null) {
Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute);
if (attrValue != null) {
ret.setAttribute(attrName, attrValue);
}
}
}
}
}

// Additional properties like classifications, meanings, and attributes...
}

private Object getCommonProperty(Map<String, Object> vertexProperties, String propertyName) {
if (vertexProperties.get(propertyName) instanceof List) {
return ((List<?>) vertexProperties.get(propertyName)).get(0);
Expand Down

0 comments on commit bbe483c

Please sign in to comment.