|
17 | 17 | */
|
18 | 18 | package org.apache.atlas.repository.store.graph.v2;
|
19 | 19 |
|
| 20 | +import com.google.common.collect.Sets; |
20 | 21 | import com.fasterxml.jackson.core.type.TypeReference;
|
21 | 22 | import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
22 | 23 | import org.apache.atlas.AtlasConfiguration;
|
|
49 | 50 | import org.apache.atlas.repository.graphdb.AtlasElement;
|
50 | 51 | import org.apache.atlas.repository.graphdb.AtlasGraph;
|
51 | 52 | import org.apache.atlas.repository.graphdb.AtlasVertex;
|
| 53 | +import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; |
| 54 | +import org.apache.atlas.repository.graphdb.janus.AtlasJanusVertex; |
52 | 55 | import org.apache.atlas.type.AtlasArrayType;
|
53 | 56 | import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
|
54 | 57 | import org.apache.atlas.type.AtlasEntityType;
|
|
67 | 70 | import org.apache.commons.collections.CollectionUtils;
|
68 | 71 | import org.apache.commons.collections.MapUtils;
|
69 | 72 | import org.apache.commons.lang3.StringUtils;
|
| 73 | +import org.janusgraph.core.JanusGraphMultiVertexQuery; |
| 74 | +import org.janusgraph.core.JanusGraphTransaction; |
| 75 | +import org.janusgraph.core.JanusGraphVertex; |
70 | 76 | import org.slf4j.Logger;
|
71 | 77 | import org.slf4j.LoggerFactory;
|
72 | 78 | import org.springframework.stereotype.Component;
|
@@ -1093,6 +1099,188 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
|
1093 | 1099 | return ret;
|
1094 | 1100 | }
|
1095 | 1101 |
|
| 1102 | + public List<AtlasEntityHeader> mapVerticesToAtlasEntityHeader(List<AtlasVertex> entityVertices, Set<String> attributes) throws AtlasBaseException { |
| 1103 | + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("mapVerticesToAtlasEntityHeader"); |
| 1104 | + |
| 1105 | + List<AtlasEntityHeader> results = new ArrayList<>(); |
| 1106 | + |
| 1107 | + try { |
| 1108 | + // Convert AtlasVertex to JanusGraphVertex |
| 1109 | + |
| 1110 | + // Use multiQuery for optimized property fetching |
| 1111 | + Map<JanusGraphVertex, Map<String, Object>> multiQueryResults = getBatchPropertiesWithMultiQuery(entityVertices, attributes); |
| 1112 | + Map<JanusGraphVertex, AtlasVertex> map = getJanusGraphVerticesMap(entityVertices); |
| 1113 | + multiQueryResults.forEach((janusGraphVertex, vertexProperties) -> { |
| 1114 | + AtlasEntityHeader ret = new AtlasEntityHeader(); |
| 1115 | + |
| 1116 | + // Populate AtlasEntityHeader with fetched properties |
| 1117 | + try { |
| 1118 | + populateEntityHeader(ret, map.get(janusGraphVertex), vertexProperties, attributes); |
| 1119 | + } catch (AtlasBaseException e) { |
| 1120 | + throw new RuntimeException(e); |
| 1121 | + } |
| 1122 | + results.add(ret); |
| 1123 | + }); |
| 1124 | + |
| 1125 | + } finally { |
| 1126 | + RequestContext.get().endMetricRecord(metricRecorder); |
| 1127 | + } |
| 1128 | + |
| 1129 | + return results; |
| 1130 | + } |
| 1131 | + |
| 1132 | + private Map<JanusGraphVertex, AtlasVertex> getJanusGraphVerticesMap(List<AtlasVertex> vertices) { |
| 1133 | + Map<JanusGraphVertex, AtlasVertex> resultMap = new HashMap<>(); |
| 1134 | + |
| 1135 | + for (AtlasVertex vertex : vertices) { |
| 1136 | + if (vertex instanceof AtlasJanusVertex) { |
| 1137 | + Object wrappedElement = ((AtlasJanusVertex) vertex).getWrappedElement(); |
| 1138 | + |
| 1139 | + if (wrappedElement instanceof JanusGraphVertex) { |
| 1140 | + resultMap.put((JanusGraphVertex) wrappedElement, vertex); |
| 1141 | + } else { |
| 1142 | + throw new IllegalArgumentException("Wrapped element is not an instance of JanusGraphVertex"); |
| 1143 | + } |
| 1144 | + } else { |
| 1145 | + throw new IllegalArgumentException("Provided vertex is not an instance of AtlasJanusVertex"); |
| 1146 | + } |
| 1147 | + } |
| 1148 | + |
| 1149 | + return resultMap; |
| 1150 | + } |
| 1151 | + |
| 1152 | + // Helper to convert AtlasVertex to JanusGraphVertex |
| 1153 | + private List<JanusGraphVertex> getJanusGraphVertices(List<AtlasVertex> vertices) { |
| 1154 | + List<JanusGraphVertex> results = new ArrayList<>(); |
| 1155 | + for(AtlasVertex vertex : vertices) { |
| 1156 | + if (((AtlasJanusVertex) vertex).getWrappedElement() instanceof JanusGraphVertex) { |
| 1157 | + results.add(vertex.getWrappedElement()); |
| 1158 | + } else { |
| 1159 | + throw new IllegalArgumentException("Provided vertex is not an instance of JanusGraphVertex"); |
| 1160 | + } |
| 1161 | + } |
| 1162 | + return results; |
| 1163 | + } |
| 1164 | + |
| 1165 | + // Use multiQuery to batch-fetch properties |
| 1166 | + private Map<JanusGraphVertex, Map<String, Object>> getBatchPropertiesWithMultiQuery(List<AtlasVertex> entityVertices, Set<String> attributes) { |
| 1167 | + Iterable<JanusGraphVertex> vertices = getJanusGraphVertices(entityVertices); |
| 1168 | + List<JanusGraphVertex> target = new ArrayList<>(); |
| 1169 | + vertices.forEach(target::add); |
| 1170 | + JanusGraphTransaction transaction = ((AtlasJanusGraph)graph).getTransaction(); |
| 1171 | + try { |
| 1172 | + JanusGraphMultiVertexQuery multiQuery = transaction.multiQuery(target); |
| 1173 | + |
| 1174 | + Set<String> keys = Sets.newHashSet(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, |
| 1175 | + Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, |
| 1176 | + Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); |
| 1177 | + |
| 1178 | + keys.addAll(attributes); |
| 1179 | + |
| 1180 | + multiQuery.keys(Constants.TYPE_NAME_PROPERTY_KEY, Constants.GUID_PROPERTY_KEY, |
| 1181 | + Constants.CREATED_BY_KEY, Constants.MODIFIED_BY_KEY, |
| 1182 | + Constants.TIMESTAMP_PROPERTY_KEY, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, STATE_PROPERTY_KEY); |
| 1183 | + |
| 1184 | + Map<JanusGraphVertex, Map<String, Object>> vertexPropertiesMap = new HashMap<>(); |
| 1185 | + |
| 1186 | + for (JanusGraphVertex vertex : vertices) { |
| 1187 | + Map<String, Object> properties = new HashMap<>(); |
| 1188 | + for (String key : keys) { |
| 1189 | + properties.put(key, vertex.property(key).orElse(null)); |
| 1190 | + } |
| 1191 | + vertexPropertiesMap.put(vertex, properties); |
| 1192 | + } |
| 1193 | + return vertexPropertiesMap; |
| 1194 | + } finally { |
| 1195 | + if(transaction != null) { |
| 1196 | + transaction.commit(); |
| 1197 | + transaction.close(); |
| 1198 | + } |
| 1199 | + } |
| 1200 | + |
| 1201 | + } |
| 1202 | + |
| 1203 | + // Populate AtlasEntityHeader |
| 1204 | + private void populateEntityHeader(AtlasEntityHeader ret, AtlasVertex entityVertex, Map<String, Object> vertexProperties, Set<String> attributes) throws AtlasBaseException { |
| 1205 | + String typeName = (String) vertexProperties.get(Constants.TYPE_NAME_PROPERTY_KEY); |
| 1206 | + String guid = (String) vertexProperties.get(Constants.GUID_PROPERTY_KEY); |
| 1207 | + String createdBy = (String) vertexProperties.get(Constants.CREATED_BY_KEY); |
| 1208 | + String updatedBy = (String) vertexProperties.get(Constants.MODIFIED_BY_KEY); |
| 1209 | + Long createTime = (Long) vertexProperties.get(Constants.TIMESTAMP_PROPERTY_KEY); |
| 1210 | + Long updateTime = (Long) vertexProperties.get(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY); |
| 1211 | + Boolean isIncomplete = isEntityIncomplete(entityVertex); |
| 1212 | + |
| 1213 | + ret.setTypeName(typeName); |
| 1214 | + ret.setGuid(guid); |
| 1215 | + ret.setStatus(GraphHelper.getStatus(entityVertex)); |
| 1216 | + ret.setIsIncomplete(isIncomplete); |
| 1217 | + ret.setCreatedBy(createdBy); |
| 1218 | + ret.setUpdatedBy(updatedBy); |
| 1219 | + ret.setCreateTime(createTime != null ? new Date(createTime) : null); |
| 1220 | + ret.setUpdateTime(updateTime != null ? new Date(updateTime) : null); |
| 1221 | + ret.setLabels(getLabels(entityVertex)); |
| 1222 | + |
| 1223 | + // Classifications |
| 1224 | + RequestContext context = RequestContext.get(); |
| 1225 | + if (context.includeClassifications() || context.isIncludeClassificationNames()) { |
| 1226 | + ret.setClassificationNames(getAllTraitNamesFromAttribute(entityVertex)); |
| 1227 | + } |
| 1228 | + |
| 1229 | + // Meanings |
| 1230 | + if (context.includeMeanings()) { |
| 1231 | + List<AtlasTermAssignmentHeader> termAssignmentHeaders = mapAssignedTerms(entityVertex); |
| 1232 | + ret.setMeanings(termAssignmentHeaders); |
| 1233 | + ret.setMeaningNames(termAssignmentHeaders.stream() |
| 1234 | + .map(AtlasTermAssignmentHeader::getDisplayText) |
| 1235 | + .collect(Collectors.toList())); |
| 1236 | + } |
| 1237 | + |
| 1238 | + // Process entity type and attributes |
| 1239 | + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); |
| 1240 | + if (entityType != null) { |
| 1241 | + // Header attributes |
| 1242 | + for (AtlasAttribute headerAttribute : entityType.getHeaderAttributes().values()) { |
| 1243 | + Object attrValue = getVertexAttributeFromBatch(vertexProperties, headerAttribute); |
| 1244 | + if (attrValue != null) { |
| 1245 | + ret.setAttribute(headerAttribute.getName(), attrValue); |
| 1246 | + } |
| 1247 | + } |
| 1248 | + |
| 1249 | + // Display text |
| 1250 | + Object displayText = getDisplayText(entityVertex, entityType); |
| 1251 | + if (displayText != null) { |
| 1252 | + ret.setDisplayText(displayText.toString()); |
| 1253 | + } |
| 1254 | + |
| 1255 | + // Additional attributes |
| 1256 | + if (CollectionUtils.isNotEmpty(attributes)) { |
| 1257 | + for (String attrName : attributes) { |
| 1258 | + AtlasAttribute attribute = getEntityOrRelationshipAttribute(entityType, attrName); |
| 1259 | + if (attribute != null) { |
| 1260 | + Object attrValue = getVertexAttributeFromBatch(vertexProperties, attribute); |
| 1261 | + if (attrValue != null) { |
| 1262 | + ret.setAttribute(attrName, attrValue); |
| 1263 | + } |
| 1264 | + } |
| 1265 | + } |
| 1266 | + } |
| 1267 | + } |
| 1268 | + |
| 1269 | + // Additional properties like classifications, meanings, and attributes... |
| 1270 | + } |
| 1271 | + |
| 1272 | + /** |
| 1273 | + * Retrieves a vertex attribute from the pre-fetched batch of properties. |
| 1274 | + */ |
| 1275 | + private Object getVertexAttributeFromBatch(Map<String, Object> properties, AtlasAttribute attribute) { |
| 1276 | + if (properties == null || attribute == null) { |
| 1277 | + return null; |
| 1278 | + } |
| 1279 | + |
| 1280 | + String propertyKey = attribute.getVertexPropertyName(); |
| 1281 | + return properties.get(propertyKey); |
| 1282 | + } |
| 1283 | + |
1096 | 1284 | /**
|
1097 | 1285 | * Retrieves an entity or relationship attribute from the entity type.
|
1098 | 1286 | */
|
|
0 commit comments