From 6e056a24cb9a7890116ea3ec4f05f18285baa507 Mon Sep 17 00:00:00 2001 From: chaitalithombare Date: Thu, 6 Nov 2025 17:51:13 +0530 Subject: [PATCH] ATLAS-4889 : Incremental export : When an entity has tag propagated and is exported , the tag is not propagated to it in the export. --- .../repository/impexp/ExportService.java | 120 +++++++++++++++++- .../RelationshipAttributesExtractor.java | 2 +- 2 files changed, 115 insertions(+), 7 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 8ccc63c1b17..3daa6e212cc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -23,6 +23,7 @@ import org.apache.atlas.glossary.GlossaryService; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; @@ -34,6 +35,8 @@ import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; @@ -43,24 +46,36 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; @Component public class ExportService { @@ -91,13 +106,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str long startTime = System.currentTimeMillis(); AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker()); ExportContext context = new ExportContext(result, exportSink); + RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry); exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); - AtlasExportResult.OperationStatus[] statuses = processItems(request, context); + AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor); processTypesDef(context); @@ -219,20 +235,20 @@ private void processTypesDef(ExportContext context) { } } - private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) { - AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()]; - List itemsToExport = request.getItemsToExport(); + private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) { + AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()]; + List itemsToExport = request.getItemsToExport(); for (int i = 0; i < itemsToExport.size(); i++) { AtlasObjectId item = itemsToExport.get(i); - statuses[i] = processObjectId(item, context); + statuses[i] = processObjectId(item, context, relationshipAttributesExtractor); } return statuses; } - private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { + private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) { LOG.debug("==> processObjectId({})", item); try { @@ -266,6 +282,9 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex context.isSkipConnectedFetch = false; } + for (String guid : entityGuids) { + addEntityGuids(guid, context, relationshipAttributesExtractor); + } } catch (AtlasBaseException excp) { LOG.error("Fetching entity failed for: {}", item, excp); @@ -413,6 +432,93 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c context.reportProgress(); } + public void addEntityGuids(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException { + AtlasVertex adjacentVertex; + Iterator entityEdges; + Iterator propagateClassificationVertices; + Iterator appliedClassificationVertices; + String fetchedClassificationGuid; + String processInput = "__Process.inputs"; + String processOutput = "__Process.outputs"; + List processedClassifications = new ArrayList<>(); + + AtlasVertex initialEntityVertex = entityGraphRetriever.getEntityVertex(guid); + for (AtlasClassification currentClassification : entityGraphRetriever.getAllClassifications(initialEntityVertex)) { + if (context.guidsProcessed.contains(currentClassification.getEntityGuid())) { + processedClassifications.add(currentClassification); + } + } + context.newAddedGuids.add(guid); + while (!context.newAddedGuids.isEmpty()) { + String currentGuid = context.newAddedGuids.poll(); + + AtlasVertex entityVertex = entityGraphRetriever.getEntityVertex(currentGuid); + String entityTypeName = getTypeName(entityVertex); + List classifications = entityGraphRetriever.getAllClassifications(entityVertex); + if (CollectionUtils.isNotEmpty(processedClassifications)) { + classifications.removeAll(processedClassifications); + } + if (CollectionUtils.isNotEmpty(classifications)) { + for (AtlasClassification classification : classifications) { + String classificationName = classification.getTypeName(); + boolean isProcess = relationshipAttributesExtractor.isLineageType(entityTypeName); + entityEdges = isProcess + ? GraphHelper.getEdgesForLabel(entityVertex, processInput, OUT) + : GraphHelper.getEdgesForLabel(entityVertex, processOutput, IN); + while (entityEdges.hasNext()) { + AtlasEdge propagationEdge = entityEdges.next(); + AtlasVertex outVertex = propagationEdge.getOutVertex(); + AtlasVertex inVertex = propagationEdge.getInVertex(); + adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) ? inVertex : outVertex; + String adjacentGuid = getGuid(adjacentVertex); + boolean isPropagated = false; + propagateClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, true, classificationName); + while (propagateClassificationVertices.hasNext()) { + AtlasVertex classificationVertex = propagateClassificationVertices.next(); + fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class); + if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) { + addAdjacentVertices(context, adjacentGuid); + isPropagated = true; + } + } + if (!isPropagated) { + appliedClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, false, classificationName); + + while (appliedClassificationVertices.hasNext()) { + AtlasVertex classificationVertex = appliedClassificationVertices.next(); + fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class); + if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) { + addAdjacentVertices(context, adjacentGuid); + break; + } + } + } + } + } + } + } + } + + private Iterator getClassificationVertices(AtlasVertex inVertex, AtlasVertex outVertex, + boolean isProcess, boolean isPropagated, String name) { + AtlasVertex base = isProcess ? inVertex : outVertex; + return base.query() + .direction(AtlasEdgeDirection.OUT) + .label(CLASSIFICATION_LABEL) + .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated) + .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name) + .vertices().iterator(); + } + + private void addAdjacentVertices(ExportContext context, String adjacentGuid) throws AtlasBaseException { + if (!context.newAddedGuids.contains(adjacentGuid)) { + context.newAddedGuids.add(adjacentGuid); + } + if (!context.sink.guids.contains(adjacentGuid)) { + context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid)); + } + } + public enum TraversalDirection { UNKNOWN, INWARD, @@ -450,6 +556,7 @@ static class ExportContext { final UniqueList entityCreationOrder = new UniqueList<>(); final Set guidsProcessed = new HashSet<>(); final UniqueList guidsToProcess = new UniqueList<>(); + final Queue newAddedGuids = new ArrayDeque<>(); final UniqueList lineageToProcess = new UniqueList<>(); final Set lineageProcessed = new HashSet<>(); final Map guidDirection = new HashMap<>(); @@ -511,6 +618,7 @@ public void clear() { guidsToProcess.clear(); guidsProcessed.clear(); guidDirection.clear(); + newAddedGuids.clear(); startingEntityType = null; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java index 6f42e1b85f9..857fb664eea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java @@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel return isOutEdge ? OUTWARD : INWARD; } - private boolean isLineageType(String typeName) { + public boolean isLineageType(String typeName) { AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName); return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);