Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.atlas.glossary.GlossaryService;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
Expand All @@ -34,6 +35,8 @@
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
Expand All @@ -43,24 +46,36 @@
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.inject.Inject;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;

@Component
public class ExportService {
Expand Down Expand Up @@ -91,13 +106,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
long startTime = System.currentTimeMillis();
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(result, exportSink);
RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry);

exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);

try {
LOG.info("==> export(user={}, from={})", userName, requestingIP);

AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor);

processTypesDef(context);

Expand Down Expand Up @@ -219,20 +235,20 @@ private void processTypesDef(ExportContext context) {
}
}

private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) {
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
List<AtlasObjectId> itemsToExport = request.getItemsToExport();

for (int i = 0; i < itemsToExport.size(); i++) {
AtlasObjectId item = itemsToExport.get(i);

statuses[i] = processObjectId(item, context);
statuses[i] = processObjectId(item, context, relationshipAttributesExtractor);
}

return statuses;
}

private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
LOG.debug("==> processObjectId({})", item);

try {
Expand Down Expand Up @@ -266,6 +282,9 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex

context.isSkipConnectedFetch = false;
}
for (String guid : entityGuids) {
addEntityGuids(guid, context, relationshipAttributesExtractor);
}
} catch (AtlasBaseException excp) {
LOG.error("Fetching entity failed for: {}", item, excp);

Expand Down Expand Up @@ -413,6 +432,93 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c
context.reportProgress();
}

public void addEntityGuids(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
AtlasVertex adjacentVertex;
Iterator<AtlasEdge> entityEdges;
Iterator<AtlasVertex> propagateClassificationVertices;
Iterator<AtlasVertex> appliedClassificationVertices;
String fetchedClassificationGuid;
String processInput = "__Process.inputs";
String processOutput = "__Process.outputs";
List<AtlasClassification> processedClassifications = new ArrayList<>();

AtlasVertex initialEntityVertex = entityGraphRetriever.getEntityVertex(guid);
for (AtlasClassification currentClassification : entityGraphRetriever.getAllClassifications(initialEntityVertex)) {
if (context.guidsProcessed.contains(currentClassification.getEntityGuid())) {
processedClassifications.add(currentClassification);
}
}
context.newAddedGuids.add(guid);
while (!context.newAddedGuids.isEmpty()) {
String currentGuid = context.newAddedGuids.poll();

AtlasVertex entityVertex = entityGraphRetriever.getEntityVertex(currentGuid);
String entityTypeName = getTypeName(entityVertex);
List<AtlasClassification> classifications = entityGraphRetriever.getAllClassifications(entityVertex);
if (CollectionUtils.isNotEmpty(processedClassifications)) {
classifications.removeAll(processedClassifications);
}
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
boolean isProcess = relationshipAttributesExtractor.isLineageType(entityTypeName);
entityEdges = isProcess
? GraphHelper.getEdgesForLabel(entityVertex, processInput, OUT)
: GraphHelper.getEdgesForLabel(entityVertex, processOutput, IN);
while (entityEdges.hasNext()) {
AtlasEdge propagationEdge = entityEdges.next();
AtlasVertex outVertex = propagationEdge.getOutVertex();
AtlasVertex inVertex = propagationEdge.getInVertex();
adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) ? inVertex : outVertex;
String adjacentGuid = getGuid(adjacentVertex);
boolean isPropagated = false;
propagateClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, true, classificationName);
while (propagateClassificationVertices.hasNext()) {
AtlasVertex classificationVertex = propagateClassificationVertices.next();
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
addAdjacentVertices(context, adjacentGuid);
isPropagated = true;
}
}
if (!isPropagated) {
appliedClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, false, classificationName);

while (appliedClassificationVertices.hasNext()) {
AtlasVertex classificationVertex = appliedClassificationVertices.next();
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
addAdjacentVertices(context, adjacentGuid);
break;
}
}
}
}
}
}
}
}

private Iterator<AtlasVertex> getClassificationVertices(AtlasVertex inVertex, AtlasVertex outVertex,
boolean isProcess, boolean isPropagated, String name) {
AtlasVertex base = isProcess ? inVertex : outVertex;
return base.query()
.direction(AtlasEdgeDirection.OUT)
.label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name)
.vertices().iterator();
}

private void addAdjacentVertices(ExportContext context, String adjacentGuid) throws AtlasBaseException {
if (!context.newAddedGuids.contains(adjacentGuid)) {
context.newAddedGuids.add(adjacentGuid);
}
if (!context.sink.guids.contains(adjacentGuid)) {
context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid));
}
}

public enum TraversalDirection {
UNKNOWN,
INWARD,
Expand Down Expand Up @@ -450,6 +556,7 @@ static class ExportContext {
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
final Queue<String> newAddedGuids = new ArrayDeque<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
Expand Down Expand Up @@ -511,6 +618,7 @@ public void clear() {
guidsToProcess.clear();
guidsProcessed.clear();
guidDirection.clear();
newAddedGuids.clear();
startingEntityType = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel
return isOutEdge ? OUTWARD : INWARD;
}

private boolean isLineageType(String typeName) {
public boolean isLineageType(String typeName) {
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);

return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
Expand Down
Loading