Skip to content

Commit e49f23f

Browse files
author
chaitalithombare
committed
ATLAS-4889: Incremental export : When an entity has tag propagated and is exported , the tag is not propagated to it in the export.
1 parent 7502cec commit e49f23f

File tree

2 files changed

+158
-10
lines changed

2 files changed

+158
-10
lines changed

repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java

Lines changed: 157 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.atlas.glossary.GlossaryService;
2424
import org.apache.atlas.model.impexp.AtlasExportRequest;
2525
import org.apache.atlas.model.impexp.AtlasExportResult;
26+
import org.apache.atlas.model.instance.AtlasClassification;
2627
import org.apache.atlas.model.instance.AtlasEntity;
2728
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
2829
import org.apache.atlas.model.instance.AtlasObjectId;
@@ -34,6 +35,8 @@
3435
import org.apache.atlas.model.typedef.AtlasStructDef;
3536
import org.apache.atlas.model.typedef.AtlasTypesDef;
3637
import org.apache.atlas.repository.graph.GraphHelper;
38+
import org.apache.atlas.repository.graphdb.AtlasEdge;
39+
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
3740
import org.apache.atlas.repository.graphdb.AtlasGraph;
3841
import org.apache.atlas.repository.graphdb.AtlasVertex;
3942
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
@@ -43,6 +46,7 @@
4346
import org.apache.atlas.util.AtlasGremlinQueryProvider;
4447
import org.apache.commons.collections.CollectionUtils;
4548
import org.apache.commons.collections.MapUtils;
49+
import org.apache.commons.lang3.StringUtils;
4650
import org.slf4j.Logger;
4751
import org.slf4j.LoggerFactory;
4852
import org.springframework.stereotype.Component;
@@ -52,15 +56,22 @@
5256
import java.util.ArrayList;
5357
import java.util.HashMap;
5458
import java.util.HashSet;
59+
import java.util.Iterator;
5560
import java.util.List;
5661
import java.util.Map;
5762
import java.util.Set;
5863

5964
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
6065
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
6166
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
67+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
68+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
69+
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
6270
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
6371
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
72+
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
73+
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatedClassificationEdge;
74+
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
6475

6576
@Component
6677
public class ExportService {
@@ -91,13 +102,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
91102
long startTime = System.currentTimeMillis();
92103
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker());
93104
ExportContext context = new ExportContext(result, exportSink);
105+
RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry);
94106

95107
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
96108

97109
try {
98110
LOG.info("==> export(user={}, from={})", userName, requestingIP);
99111

100-
AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
112+
AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor);
101113

102114
processTypesDef(context);
103115

@@ -118,7 +130,7 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
118130
return context.result;
119131
}
120132

121-
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
133+
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
122134
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
123135

124136
if (MapUtils.isNotEmpty(context.termsGlossary)) {
@@ -139,6 +151,10 @@ public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContex
139151

140152
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
141153
}
154+
155+
if (context.isHiveTableIncremental() && !context.visitedVertices.contains(entityWithExtInfo.getEntity().getGuid())) {
156+
getEntityGuids(entityWithExtInfo, context, relationshipAttributesExtractor);
157+
}
142158
}
143159

144160
@VisibleForTesting
@@ -219,20 +235,20 @@ private void processTypesDef(ExportContext context) {
219235
}
220236
}
221237

222-
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) {
238+
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
223239
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
224240
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
225241

226242
for (int i = 0; i < itemsToExport.size(); i++) {
227243
AtlasObjectId item = itemsToExport.get(i);
228244

229-
statuses[i] = processObjectId(item, context);
245+
statuses[i] = processObjectId(item, context, relationshipAttributesExtractor);
230246
}
231247

232248
return statuses;
233249
}
234250

235-
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
251+
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
236252
LOG.debug("==> processObjectId({})", item);
237253

238254
try {
@@ -248,14 +264,14 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex
248264
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
249265
String typeName = GraphHelper.getTypeName(vertex);
250266
context.startingEntityType = typeName;
251-
processEntityGuid(guid, context);
267+
processEntityGuid(guid, context, relationshipAttributesExtractor);
252268
}
253269

254270
while (!context.guidsToProcess.isEmpty()) {
255271
while (!context.guidsToProcess.isEmpty()) {
256272
String guid = context.guidsToProcess.remove(0);
257273

258-
processEntityGuid(guid, context);
274+
processEntityGuid(guid, context, relationshipAttributesExtractor);
259275
}
260276

261277
if (!context.lineageToProcess.isEmpty()) {
@@ -285,7 +301,7 @@ private List<String> getStartingEntity(AtlasObjectId item, ExportContext context
285301
return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
286302
}
287303

288-
private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
304+
private void processEntityGuid(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
289305
LOG.debug("==> processEntityGuid({})", guid);
290306

291307
if (context.guidsProcessed.contains(guid)) {
@@ -299,7 +315,7 @@ private void processEntityGuid(String guid, ExportContext context) throws AtlasB
299315
} else {
300316
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
301317

302-
processEntity(entityWithExtInfo, context);
318+
processEntity(entityWithExtInfo, context, relationshipAttributesExtractor);
303319
}
304320

305321
LOG.debug("<== processEntityGuid({})", guid);
@@ -413,6 +429,131 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c
413429
context.reportProgress();
414430
}
415431

432+
public void getEntityGuids(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
433+
if (!context.classificationEntity.containsKey(entityWithExtInfo.getEntity().getGuid())) {
434+
if (CollectionUtils.isNotEmpty(entityWithExtInfo.getEntity().getClassifications())) {
435+
for (AtlasClassification c : entityWithExtInfo.getEntity().getClassifications()) {
436+
context.classificationEntity
437+
.computeIfAbsent(c.getEntityGuid(), key -> new UniqueList<>())
438+
.add(c.getTypeName());
439+
}
440+
}
441+
}
442+
String entityGuid = getClassificationLineage(entityWithExtInfo, context, relationshipAttributesExtractor);
443+
if (entityGuid != null) {
444+
if (!context.visitedVertices.contains(entityGuid)) {
445+
context.visitedVertices.add(entityGuid);
446+
getEntityGuids(new AtlasEntityWithExtInfo(entityGraphRetriever.toAtlasEntity(entityGuid)), context, relationshipAttributesExtractor);
447+
}
448+
}
449+
}
450+
451+
public String getClassificationLineage(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
452+
Iterable tagPropagationEdge = new ArrayList();
453+
Boolean isParent = false;
454+
AtlasVertex adjacentVertex = null;
455+
AtlasVertex entityVertexStart = entityGraphRetriever.getEntityVertex(entityWithExtInfo.getEntity().getGuid());
456+
if (CollectionUtils.isNotEmpty(entityWithExtInfo.getEntity().getClassifications())) {
457+
for (AtlasClassification classification : entityWithExtInfo.getEntity().getClassifications()) {
458+
if (context.propagatedEdgeMap.containsKey(getGuid(entityVertexStart)) &&
459+
context.propagatedEdgeMap.get(getGuid(entityVertexStart)).contains(classification.getTypeName())) {
460+
continue;
461+
}
462+
463+
context.propagatedEdgeMap.computeIfAbsent(entityWithExtInfo.getEntity().getGuid(), key -> new UniqueList<>())
464+
.add(classification.getTypeName());
465+
//We are iterating on classifications here again because while traversing
466+
// there could be more classifications on entities where classifications are blocked on edges
467+
468+
String direction = relationshipAttributesExtractor.isLineageType(getTypeName(entityVertexStart))
469+
? "__Process.inputs"
470+
: "__Process.outputs";
471+
472+
tagPropagationEdge = entityVertexStart.query()// found 2 edges
473+
.direction(direction.equals("__Process.inputs") ? AtlasEdgeDirection.OUT : AtlasEdgeDirection.IN)
474+
.label(direction)
475+
.edges();
476+
Iterator<AtlasEdge> iterator = tagPropagationEdge.iterator().hasNext() ? tagPropagationEdge.iterator() : null;
477+
478+
if (iterator != null) {
479+
while (iterator.hasNext()) {
480+
AtlasEdge edge = null;
481+
AtlasEdge propagationEdge = iterator.next();
482+
if (relationshipAttributesExtractor.isLineageType(getTypeName(entityVertexStart))) {
483+
edge = checkPropagatedEdge(propagationEdge.getInVertex(), classification.getTypeName(), classification.getEntityGuid(), edge);
484+
//this is getting executed for hive process for incoming edge from table
485+
}
486+
else {
487+
edge = checkPropagatedEdge(propagationEdge.getOutVertex(), classification.getTypeName(), classification.getEntityGuid(), edge);
488+
//this is checked for outvertex in case of table or other propagating entity which is not a process
489+
}
490+
if (edge == null) {
491+
isParent = checkIfParentVertex(propagationEdge.getInVertex(), classification.getTypeName());
492+
//This is checking if propagated edge is not found then entity must be a parent of a classification
493+
}
494+
495+
if (edge != null || isParent) {
496+
entityLineageProcess(entityVertexStart, context, propagationEdge,
497+
iterator, entityGraphRetriever, relationshipAttributesExtractor, classification);
498+
}
499+
}
500+
}
501+
}
502+
}
503+
return (adjacentVertex == null) ? null : getGuid(adjacentVertex);
504+
}
505+
506+
private void entityLineageProcess(AtlasVertex entityVertexStart, ExportContext context,
507+
AtlasEdge propagationEdge, Iterator<AtlasEdge> iterator,
508+
EntityGraphRetriever entityGraphRetriever,
509+
RelationshipAttributesExtractor relationshipAttributesExtractor, AtlasClassification classification) throws AtlasBaseException {
510+
if (context.classificationEntity.containsKey(getGuid(entityVertexStart)) &&
511+
context.classificationEntity.get(getGuid(entityVertexStart)).contains(classification.getTypeName())) {
512+
return;
513+
}
514+
AtlasVertex outVertex = propagationEdge.getOutVertex();
515+
AtlasVertex inVertex = propagationEdge.getInVertex();
516+
517+
AtlasVertex adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertexStart.getIdForDisplay()) ? inVertex : outVertex;
518+
AtlasEntity entity = entityGraphRetriever.toAtlasEntity(getGuid(adjacentVertex));
519+
//Here we check if adjacent entity is parent of ongoing classification
520+
//Here we check for iterator.
521+
if (context.classificationEntity.containsKey(getGuid(adjacentVertex)) &&
522+
context.classificationEntity.get(getGuid(adjacentVertex)).contains(classification.getTypeName())
523+
|| iterator.hasNext()) {
524+
if (!context.guidsToProcess.contains(getGuid(adjacentVertex))) {
525+
context.guidsToProcess.add(getGuid(adjacentVertex));
526+
}
527+
528+
getEntityGuids(new AtlasEntityWithExtInfo(entity), context, relationshipAttributesExtractor);
529+
//we have to send this to getEntityGuids
530+
//to cover any other classifications if applied on adjacent vertex and got missed incase of blocktag
531+
}
532+
if (!context.guidsToProcess.contains(getGuid(adjacentVertex))) {
533+
context.guidsToProcess.add(getGuid(adjacentVertex));
534+
}
535+
}
536+
537+
//This method executes when a process or a table has propagated classification edge
538+
private AtlasEdge checkPropagatedEdge(AtlasVertex vertex, String classification, String guid, AtlasEdge edgeID) {
539+
if (getPropagatedClassificationEdge(vertex, classification, guid) != null) {
540+
edgeID = getPropagatedClassificationEdge(vertex, classification, guid);
541+
}
542+
return edgeID;
543+
}
544+
545+
//This method gets executed when a table/Process is a parent to a classification
546+
public Boolean checkIfParentVertex(AtlasVertex vertex, String classification) {
547+
Boolean isParent;
548+
Iterable appliedClassifications = vertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
549+
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
550+
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classification).edges();
551+
Iterator<AtlasEdge> classificationsIterator = appliedClassifications.iterator().hasNext() ? appliedClassifications.iterator() : null;
552+
isParent = classificationsIterator != null && classificationsIterator.hasNext();
553+
554+
return isParent;
555+
}
556+
416557
public enum TraversalDirection {
417558
UNKNOWN,
418559
INWARD,
@@ -460,6 +601,10 @@ static class ExportContext {
460601
final Set<String> relationshipTypes = new HashSet<>();
461602
final Set<String> businessMetadataTypes = new HashSet<>();
462603
final Map<String, String> termsGlossary = new HashMap<>();
604+
final Map<String, UniqueList<String>> classificationEntity = new HashMap<>();
605+
private final Map<String, UniqueList<String>> propagatedEdgeMap = new HashMap<>();
606+
final Set<String> visitedVertices = new HashSet<>();
607+
463608
final AtlasExportResult result;
464609
final ExportFetchType fetchType;
465610
final boolean skipLineage;
@@ -511,6 +656,9 @@ public void clear() {
511656
guidsToProcess.clear();
512657
guidsProcessed.clear();
513658
guidDirection.clear();
659+
visitedVertices.clear();
660+
classificationEntity.clear();
661+
propagatedEdgeMap.clear();
514662
startingEntityType = null;
515663
}
516664

repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel
131131
return isOutEdge ? OUTWARD : INWARD;
132132
}
133133

134-
private boolean isLineageType(String typeName) {
134+
public boolean isLineageType(String typeName) {
135135
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
136136

137137
return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);

0 commit comments

Comments
 (0)