Skip to content

Commit

Permalink
Merge branch 'janusoptimisation' into janusupgrademultiquery
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshi0301 committed Nov 27, 2024
2 parents 447204d + 50d37fc commit a51bea1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,27 @@ private void setPolicyItems(RangerPolicy rangerPolicy, AtlasEntityHeader atlasPo

List<String> users = (List<String>) atlasPolicy.getAttribute("policyUsers");
List<String> groups = (List<String>) atlasPolicy.getAttribute("policyGroups");
List<String> roles = (List<String>) atlasPolicy.getAttribute("policyRoles");

List<String> roles = null;
if (atlasPolicy.getAttribute("policyRoles") instanceof String) {
String policyRoles = (String) atlasPolicy.getAttribute("policyRoles");
LOG.info("policyRoles", policyRoles);
roles = new ArrayList<>(Arrays.asList(policyRoles));
} else {
roles = (List<String>) atlasPolicy.getAttribute("policyRoles");
}


List<RangerPolicyItemAccess> accesses = new ArrayList<>();
List<String> actions = (List<String>) atlasPolicy.getAttribute("policyActions");
List<String> actions = null;

if (atlasPolicy.getAttribute("policyActions") instanceof String) {
String policyActions = (String) atlasPolicy.getAttribute("policyActions");
LOG.info("policyActions", policyActions);
actions = new ArrayList<>(Arrays.asList(policyActions));
} else {
actions = (List<String>) atlasPolicy.getAttribute("policyActions");
}

actions.forEach(action -> accesses.add(new RangerPolicyItemAccess(action)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.atlas.repository.store.graph.v2;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
Expand Down Expand Up @@ -70,9 +69,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.janusgraph.core.JanusGraphMultiVertexQuery;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -94,11 +91,10 @@
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CONFIDENCE;
import static org.apache.atlas.glossary.GlossaryUtils.TERM_ASSIGNMENT_ATTR_CREATED_BY;
Expand Down Expand Up @@ -137,6 +133,7 @@
@Component
public class EntityGraphRetriever {
private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class);
private static final int BATCH_SIZE = 300;

private static final String GLOSSARY_TERM_DISPLAY_NAME_ATTR = "name";
public static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment";
Expand Down Expand Up @@ -1103,12 +1100,12 @@ public List<AtlasEntityHeader> mapVerticesToAtlasEntityHeader(List<AtlasVertex>
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVerticesToAtlasEntityHeader");

List<AtlasEntityHeader> results = new ArrayList<>();

try {
// Convert AtlasVertex to JanusGraphVertex

// Use multiQuery for optimized property fetching
Map<JanusGraphVertex, Map<String, Object>> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes);
AtlasJanusGraph janusGraph = ((AtlasJanusGraph)graph);
Map<JanusGraphVertex, Map<String, Object>> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes, janusGraph);
Map<JanusGraphVertex, AtlasVertex> map = getJanusGraphVerticesMap(entityVertices);
multiQueryResults.forEach((janusGraphVertex, vertexProperties) -> {
AtlasEntityHeader ret = new AtlasEntityHeader();
Expand Down Expand Up @@ -1162,40 +1159,67 @@ private List<JanusGraphVertex> getJanusGraphVertices(List<AtlasVertex> vertices)
return results;
}

// Use multiQuery to batch-fetch properties
private Map<JanusGraphVertex, Map<String, Object>> getBatchPropertiesWithMultiQuery(List<AtlasVertex> entityVertices, Set<String> attributes) {
Iterable<JanusGraphVertex> vertices = getJanusGraphVertices(entityVertices);
List<JanusGraphVertex> target = new ArrayList<>();
vertices.forEach(target::add);
JanusGraphTransaction transaction = ((AtlasJanusGraph)graph).getTransaction();
public Map<JanusGraphVertex, Map<String, Object>> getBatchPropertiesWithMultiQuery(
List<AtlasVertex> entityVertices, Set<String> attributes, AtlasJanusGraph graph) {

Map<JanusGraphVertex, Map<String, Object>> result = new ConcurrentHashMap<>();
List<JanusGraphVertex> vertices = getJanusGraphVertices(entityVertices);

// Split vertices into batches and process them in parallel
AtomicInteger counter = new AtomicInteger(0);
StreamSupport.stream(vertices.spliterator(), false)
.collect(Collectors.groupingBy(v -> counter.getAndIncrement() / BATCH_SIZE))
.values()
.parallelStream()
.forEach(batch -> {
Map<JanusGraphVertex, Map<String, Object>> batchResult = processBatch(batch, attributes, graph);
result.putAll(batchResult);
});

return result;
}

private Map<JanusGraphVertex, Map<String, Object>> processBatch(
List<JanusGraphVertex> batch, Set<String> attributes, AtlasJanusGraph graph) {

JanusGraphTransaction transaction = graph.getTransaction();
try {
JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(target);
JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(batch);

Set<String> keys = Sets.newHashSet(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY,
// Add all required keys to fetch
Set<String> keys = new HashSet<>(Arrays.asList(
Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY,
Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY,
Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY);

Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY
));
keys.addAll(attributes);

// Set the keys in the multiQuery
multiQuery.keys(keys.toArray(new String[0]));

// Fetch properties in bulk
Map<JanusGraphVertex, Iterable<JanusGraphProperty>> results = multiQuery.properties();

// Convert results to the desired map format
Map<JanusGraphVertex, Map<String, Object>> vertexPropertiesMap = new HashMap<>();
for (Map.Entry<JanusGraphVertex, Iterable<JanusGraphProperty>> entry : results.entrySet()) {
JanusGraphVertex vertex = entry.getKey();
Iterable<JanusGraphProperty> properties = entry.getValue();

for (JanusGraphVertex vertex : vertices) {
Map<String, Object> properties = new HashMap<>();
for (String key : keys) {
properties.put(key, vertex.property(key).orElse(null));
Map<String, Object> propertiesMap = new HashMap<>();
for (JanusGraphProperty property : properties) {
propertiesMap.put(property.key(), property.value());
}
vertexPropertiesMap.put(vertex, properties);
vertexPropertiesMap.put(vertex, propertiesMap);
}

return vertexPropertiesMap;
} finally {
if(transaction != null) {
if (transaction != null) {
transaction.commit();
transaction.close();
}
}

}

// Populate AtlasEntityHeader
Expand Down

0 comments on commit a51bea1

Please sign in to comment.