From 84a4077c5694c34bfb98dc7ab9bfc279d56e25f0 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Sat, 22 Jul 2023 00:03:07 +0530 Subject: [PATCH 01/10] GOV-967 Move Terms, Categories across Glossaries --- .../java/org/apache/atlas/AtlasErrorCode.java | 2 +- .../store/graph/v2/AtlasEntityStoreV2.java | 2 +- .../v2/preprocessor/PreProcessorUtils.java | 1 + .../AbstractGlossaryPreProcessor.java | 232 ++++++++++++++ .../glossary/CategoryPreProcessor.java | 287 ++++++++++++++++-- .../glossary/TermPreProcessor.java | 184 +++++------ .../store/graph/v2/tasks/MeaningsTask.java | 4 +- .../store/graph/v2/tasks/MeaningsTasks.java | 3 +- 8 files changed, 587 insertions(+), 128 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index b5f6606f90..05a7b72b16 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -226,7 +226,7 @@ public enum AtlasErrorCode { SAVED_SEARCH_ALREADY_EXISTS(409, "ATLAS-409-00-006", "search named {0} already exists for user {1}"), GLOSSARY_ALREADY_EXISTS(409, "ATLAS-409-00-007", "Glossary with name {0} already exists"), GLOSSARY_TERM_ALREADY_EXISTS(409, "ATLAS-409-00-009", "Glossary term with name {0} already exists"), - GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with name {0} already exists"), + GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with name {0} already exists on this level"), ACHOR_UPDATION_NOT_SUPPORTED(409, "ATLAS-400-00-0010", "Anchor(glossary) change not supported"), GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"), TYPE_WITH_DISPLAY_NAME_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Given type {0} already exists"), diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index c96be39268..4f715b37f4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -1699,7 +1699,7 @@ public PreProcessor getPreProcessor(String typeName) { break; case ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE: - preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever); + preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement); break; case QUERY_ENTITY_TYPE: diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index db7a807551..657c79ad7b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -26,6 +26,7 @@ public class PreProcessorUtils { //Glossary models constants public static final String ANCHOR = "anchor"; + public static final String CATEGORY_TERMS = "terms"; public static final String CATEGORY_PARENT = "parentCategory"; public static final String CATEGORY_CHILDREN = "childrenCategories"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java new file mode 100644 index 0000000000..ade0665551 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -0,0 +1,232 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.glossary; + +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.IndexSearchParams; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask; +import org.apache.atlas.tasks.TaskManagement; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; +import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; +import static org.apache.atlas.repository.Constants.NAME; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; +import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.from; + +public abstract class AbstractGlossaryPreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); + + static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); + + protected static final String ATTR_MEANINGS = "meanings"; + protected static final String ATTR_CATEGORIES = "categories"; + + protected final AtlasTypeRegistry typeRegistry; + protected final EntityGraphRetriever entityRetriever; + protected final TaskManagement taskManagement; + + protected EntityDiscoveryService discovery; + + AbstractGlossaryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, TaskManagement taskManagement) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + this.taskManagement = taskManagement; + + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + public boolean termExists(String termName, String glossaryQName) { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("termExists"); + boolean ret = false; + + ret = AtlasGraphUtilsV2.termExists(termName, glossaryQName); + + RequestContext.get().endMetricRecord(metricRecorder); + return ret; + } + + public void createAndQueueTask(String taskType, + String currentTermName, String updatedTermName, + String termQName, String updatedTermQualifiedName, + AtlasVertex termVertex) { + String termGuid = GraphHelper.getGuid(termVertex); + String currentUser = RequestContext.getCurrentUser(); + Map taskParams = MeaningsTask.toParameters(currentTermName, updatedTermName, termQName, updatedTermQualifiedName, termGuid); + AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams); + + AtlasGraphUtilsV2.addEncodedProperty(termVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); + + RequestContext.get().queueTask(task); + } + + public boolean checkEntityTermAssociation(String termQName) throws AtlasBaseException { + List entityHeader; + entityHeader = discovery.searchUsingTermQualifiedName(0,1,termQName,null,null); + Boolean hasEntityAssociation = entityHeader != null ? true : false; + return hasEntityAssociation; + } + + public List indexSearchPaginated(Map dsl) throws AtlasBaseException { + IndexSearchParams searchParams = new IndexSearchParams(); + List ret = new ArrayList<>(); + + List sortList = new ArrayList<>(0); + sortList.add(mapOf("__timestamp", mapOf("order", "asc"))); + sortList.add(mapOf("__guid", mapOf("order", "asc"))); + dsl.put("sort", sortList); + + int from = 0; + int size = 1; + boolean hasMore = true; + do { + dsl.put("from", from); + dsl.put("size", size); + searchParams.setDsl(dsl); + + List headers = discovery.directIndexSearch(searchParams).getEntities(); + + if (CollectionUtils.isNotEmpty(headers)) { + ret.addAll(headers); + } else { + hasMore = false; + } + + from += size; + + } while (hasMore); + + return ret; + } + + public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, String termQName, String updatedTermQName, String termGuid) throws AtlasBaseException { + Set attributes = new HashSet(){{ + add(ATTR_MEANINGS); + }}; + + Set relationAttributes = new HashSet(){{ + add(STATE_PROPERTY_KEY); + add(NAME); + }}; + + int from = 0; + while (true) { + List entityHeaders = discovery.searchUsingTermQualifiedName(from, ELASTICSEARCH_PAGINATION_SIZE, + termQName, attributes, relationAttributes); + + if (entityHeaders == null) + break; + + for (AtlasEntityHeader entityHeader : entityHeaders) { + AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(entityHeader.getGuid()); + + if (!currentTermName.equals(updatedTermName)) { + List meanings = (List) entityHeader.getAttribute(ATTR_MEANINGS); + + String updatedMeaningsText = meanings + .stream() + .filter(x -> AtlasEntity.Status.ACTIVE.name().equals(x.getAttributes().get(STATE_PROPERTY_KEY))) + .map(x -> x.getGuid().equals(termGuid) ? updatedTermName : x.getAttributes().get(NAME).toString()) + .collect(Collectors.joining(",")); + + AtlasGraphUtilsV2.setEncodedProperty(entityVertex, MEANINGS_TEXT_PROPERTY_KEY, updatedMeaningsText); + List meaningsNames = entityVertex.getMultiValuedProperty(MEANING_NAMES_PROPERTY_KEY, String.class); + + if (meaningsNames.contains(currentTermName)) { + AtlasGraphUtilsV2.removeItemFromListPropertyValue(entityVertex, MEANING_NAMES_PROPERTY_KEY, currentTermName); + AtlasGraphUtilsV2.addListProperty(entityVertex, MEANING_NAMES_PROPERTY_KEY, updatedTermName, true); + } + } + + if (StringUtils.isNotEmpty(updatedTermQName) && !termQName.equals(updatedTermQName)) { + AtlasGraphUtilsV2.removeItemFromListPropertyValue(entityVertex, MEANINGS_PROPERTY_KEY, updatedTermQName); + AtlasGraphUtilsV2.addListProperty(entityVertex, MEANINGS_PROPERTY_KEY, updatedTermQName, true); + } + } + + from += ELASTICSEARCH_PAGINATION_SIZE; + + if (entityHeaders.size() < ELASTICSEARCH_PAGINATION_SIZE) { + break; + } + } + } + + /** + * Get all the active parents + * @param vertex entity vertex + * @param parentEdgeLabel Edge label of parent + * @return Iterator of children vertices + */ + protected Iterator getActiveParents(AtlasVertex vertex, String parentEdgeLabel) throws AtlasBaseException { + return getEdges(vertex, parentEdgeLabel, AtlasEdgeDirection.IN); + } + + /** + * Get all the active children of category + * @param vertex entity vertex + * @param childrenEdgeLabel Edge label of children + * @return Iterator of children vertices + */ + protected Iterator getActiveChildren(AtlasVertex vertex, String childrenEdgeLabel) throws AtlasBaseException { + return getEdges(vertex, childrenEdgeLabel, AtlasEdgeDirection.OUT); + } + + protected Iterator getEdges(AtlasVertex vertex, String childrenEdgeLabel, AtlasEdgeDirection direction) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("CategoryPreProcessor.getEdges"); + + try { + return vertex.query() + .direction(direction) + .label(childrenEdgeLabel) + .has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE) + .vertices() + .iterator(); + } catch (Exception e) { + LOG.error("Error while getting active children of category for edge label " + childrenEdgeLabel, e); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index 9c4d4f735e..a652562fea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -24,16 +24,20 @@ import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; -import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.tasks.TaskManagement; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -43,26 +47,40 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST; +import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; +import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE; +import static org.apache.atlas.repository.Constants.CATEGORY_PARENT_EDGE_LABEL; +import static org.apache.atlas.repository.Constants.CATEGORY_TERMS_EDGE_LABEL; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; - -public class CategoryPreProcessor implements PreProcessor { +import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; +import static org.apache.atlas.type.Constants.CATEGORIES_PARENT_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.CATEGORIES_PROPERTY_KEY; +import static org.apache.atlas.type.Constants.GLOSSARY_PROPERTY_KEY; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.both; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.from; + +public class CategoryPreProcessor extends AbstractGlossaryPreProcessor { private static final Logger LOG = LoggerFactory.getLogger(CategoryPreProcessor.class); - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityRetriever; - private AtlasEntityHeader anchor; private AtlasEntityHeader parentCategory; - public CategoryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever) { - this.entityRetriever = entityRetriever; - this.typeRegistry = typeRegistry; + public CategoryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, + AtlasGraph graph, TaskManagement taskManagement) { + super(typeRegistry, entityRetriever, graph, taskManagement); } @Override @@ -97,14 +115,10 @@ private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throw throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } - if (parentCategory != null) { - AtlasEntity newParent = entityRetriever.toAtlasEntity(parentCategory.getGuid()); - AtlasRelatedObjectId newAnchor = (AtlasRelatedObjectId) newParent.getRelationshipAttribute(ANCHOR); - - if (newAnchor != null && !newAnchor.getGuid().equals(anchor.getGuid())){ - throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); - } - } + //check duplicate category name + String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); + categoryExists(catName, glossaryQualifiedName); + validateParent(glossaryQualifiedName); entity.setAttribute(QUALIFIED_NAME, createQualifiedName(vertex)); AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), @@ -124,32 +138,241 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } - AtlasEntity storeObject = entityRetriever.toAtlasEntity(vertex); - AtlasRelatedObjectId existingAnchor = (AtlasRelatedObjectId) storeObject.getRelationshipAttribute(ANCHOR); - if (existingAnchor != null && !existingAnchor.getGuid().equals(anchor.getGuid())){ - throw new AtlasBaseException(AtlasErrorCode.ACHOR_UPDATION_NOT_SUPPORTED); + AtlasEntity storedCategory = entityRetriever.toAtlasEntity(vertex); + AtlasRelatedObjectId currentGlossary = (AtlasRelatedObjectId) storedCategory.getRelationshipAttribute(ANCHOR); + AtlasVertex currentGlossaryVertex = entityRetriever.getEntityVertex(currentGlossary.getGuid()); + String currentGlossaryQualifiedName = currentGlossaryVertex.getProperty(QUALIFIED_NAME, String.class); + + String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); + + if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){ + processMoveCategoryToAnotherGlossary(entity, vertex, vertexQnName); + + } else { + categoryExists(catName, newGlossaryQualifiedName); + validateChildren(entity, storedCategory); + validateParent(newGlossaryQualifiedName); + + entity.setAttribute(QUALIFIED_NAME, vertexQnName); } - if (parentCategory != null) { - AtlasEntity newParent = entityRetriever.toAtlasEntity(parentCategory.getGuid()); - AtlasRelatedObjectId newAnchor = (AtlasRelatedObjectId) newParent.getRelationshipAttribute(ANCHOR); + RequestContext.get().endMetricRecord(metricRecorder); + } - if (newAnchor != null && !newAnchor.getGuid().equals(existingAnchor.getGuid())){ - throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); + private void processMoveCategoryToAnotherGlossary(AtlasEntity category, + AtlasVertex categoryVertex, + String currentCategoryQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveCategoryToAnotherGlossary"); + + try { + if (category.hasRelationshipAttribute(CATEGORY_CHILDREN) || category.hasRelationshipAttribute(CATEGORY_TERMS)) { + throw new AtlasBaseException(BAD_REQUEST, String.format("Please do not pass relationship attributes [%s, %s] while moving Category to different Glossary", + CATEGORY_CHILDREN, CATEGORY_TERMS)); } + + AtlasObjectId targetGlossary = (AtlasObjectId) category.getRelationshipAttribute(ANCHOR); + String targetGlossaryQualifiedName; + AtlasVertex targetGlossaryVertex; + + if (targetGlossary.getUniqueAttributes() != null && targetGlossary.getUniqueAttributes().containsKey(QUALIFIED_NAME)) { + targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary); + targetGlossaryQualifiedName = (String) targetGlossary.getUniqueAttributes().get(QUALIFIED_NAME); + } else { + targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary.getGuid()); + targetGlossaryQualifiedName = targetGlossaryVertex.getProperty(QUALIFIED_NAME, String.class); + } + + String categoryName = (String) category.getAttribute(NAME); + + LOG.info("Moving category {} to Glossary {}", categoryName, targetGlossaryQualifiedName); + + categoryExists(categoryName , targetGlossaryQualifiedName); + validateParentForGlossaryChange(category, categoryVertex, targetGlossaryQualifiedName); + + String sourceGlossaryQualifiedName = currentCategoryQualifiedName.split("@")[1]; + String updatedQualifiedName = currentCategoryQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + + category.setAttribute(QUALIFIED_NAME, updatedQualifiedName); + + moveChildrenToAnotherGlossary(categoryVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + + LOG.info("Moved category {} to Glossary {}", categoryName, targetGlossaryQualifiedName); + + } finally { + RequestContext.get().endMetricRecord(recorder); } + } - validateChildren(entity, storeObject); + private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, + String parentCategoryQualifiedName, + String sourceGlossaryQualifiedName, + String targetGlossaryQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildrenToAnotherGlossary"); - entity.setAttribute(QUALIFIED_NAME, vertexQnName); - RequestContext.get().endMetricRecord(metricRecorder); + + try { + LOG.info("Moving child category {} to Glossary {}", childCategoryVertex.getProperty(NAME, String.class), targetGlossaryQualifiedName); + + String currentCategoryQualifiedName = childCategoryVertex.getProperty(QUALIFIED_NAME, String.class); + String updatedQualifiedName = currentCategoryQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + + // Change cat Qname + childCategoryVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName); + + //change __glossary, __parentCategory + childCategoryVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); + childCategoryVertex.setProperty(CATEGORIES_PARENT_PROPERTY_KEY, parentCategoryQualifiedName); + + // move terms to target Glossary + Iterator terms = getActiveChildren(childCategoryVertex, CATEGORY_TERMS_EDGE_LABEL); + + while (terms.hasNext()) { + AtlasVertex termVertex = terms.next(); + moveChildTermToAnotherGlossary(termVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + } + + // Get all children categories of category + Iterator childCategories = getActiveChildren(childCategoryVertex, CATEGORY_PARENT_EDGE_LABEL); + + while (childCategories.hasNext()) { + AtlasVertex childVertex = childCategories.next(); + moveChildrenToAnotherGlossary(childVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + } + + LOG.info("Moved child category {} to Glossary {}", childCategoryVertex.getProperty(NAME, String.class), targetGlossaryQualifiedName); + } finally { + RequestContext.get().endMetricRecord(recorder); + } } - private void validateChildren(AtlasEntity entity, AtlasEntity storeObject) throws AtlasBaseException { + public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, + String parentCategoryQualifiedName, + String sourceGlossaryQualifiedName, + String targetGlossaryQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildTermToAnotherGlossary"); + + try { + String termName = termVertex.getProperty(NAME, String.class); + String termGuid = GraphHelper.getGuid(termVertex); + + LOG.info("Moving child term {} to Glossary {}", termName, targetGlossaryQualifiedName); + + //check duplicate term name + if (termExists(termName, targetGlossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); + } + + //qualifiedName + String currentTermQualifiedName = termVertex.getProperty(QUALIFIED_NAME, String.class); + String updatedTermQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + + termVertex.setProperty(QUALIFIED_NAME, updatedTermQualifiedName); + + // __glossary, __categories + termVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); + termVertex.setProperty(CATEGORIES_PROPERTY_KEY, parentCategoryQualifiedName); + + if (checkEntityTermAssociation(currentTermQualifiedName)) { + if (taskManagement != null && DEFERRED_ACTION_ENABLED) { + createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, termName, termName, currentTermQualifiedName, updatedTermQualifiedName, termVertex); + } else { + updateMeaningsAttributesInEntitiesOnTermUpdate(termName, termName, currentTermQualifiedName, updatedTermQualifiedName, termGuid); + } + } + + LOG.info("Moved child term {} to Glossary {}", termName, targetGlossaryQualifiedName); + } finally { + RequestContext.get().endMetricRecord(recorder); + } + } + + private void validateParentForGlossaryChange(AtlasEntity category, + AtlasVertex categoryVertex, + String targetGlossaryQualifiedName) throws AtlasBaseException { + + if (!category.hasRelationshipAttribute(CATEGORY_PARENT)) { + // parentCategory not present in payload, check in store + Iterator parentItr = getActiveParents(categoryVertex, CATEGORY_PARENT_EDGE_LABEL); + + if (parentItr.hasNext()) { + AtlasVertex parentCategory = parentItr.next(); + String parentCategoryQualifiedName = parentCategory.getProperty(QUALIFIED_NAME, String.class); + + if (!parentCategoryQualifiedName.endsWith(targetGlossaryQualifiedName)){ + throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); + } + } + } else { + validateParent(targetGlossaryQualifiedName); + } + } + + private void categoryExists(String categoryName, String glossaryQualifiedName) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("categoryExists"); + + boolean exists = false; + try { + Map dsl = mapOf("from", 0); + + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName))); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", categoryName))); + + + Map bool = new HashMap<>(); + if (parentCategory != null) { + String parentQname = (String) parentCategory.getAttribute(QUALIFIED_NAME); + mustClauseList.add(mapOf("term", mapOf("__parentCategory", parentQname))); + } else { + List mustNotClauseList = new ArrayList(); + mustNotClauseList.add(mapOf("exists", mapOf("field", "__parentCategory"))); + bool.put("must_not", mustNotClauseList); + } + + bool.put("must", mustClauseList); + dsl.put("query", mapOf("bool", bool)); + + List categories = indexSearchPaginated(dsl); + + if (CollectionUtils.isNotEmpty(categories)) { + for (AtlasEntityHeader category : categories) { + String name = (String) category.getAttribute(NAME); + if (categoryName.equals(name)) { + exists = true; + break; + } + } + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + if (exists) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_CATEGORY_ALREADY_EXISTS, categoryName); + } + } + + private void validateParent(String glossaryQualifiedName) throws AtlasBaseException { + // in case parent category is present, ensure it belongs to same Glossary + + if (parentCategory != null) { + String newParentGlossaryQualifiedName = (String) parentCategory.getAttribute(QUALIFIED_NAME); + + if (!newParentGlossaryQualifiedName.endsWith(glossaryQualifiedName)){ + throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); + } + } + } + + private void validateChildren(AtlasEntity entity, AtlasEntity storedCategory) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("CategoryPreProcessor.validateChildren"); + // in case new child is being added, ensure it belongs to same Glossary + List existingChildren = new ArrayList<>(); - if (storeObject != null) { - existingChildren = (List) storeObject.getRelationshipAttribute(CATEGORY_CHILDREN); + if (storedCategory != null) { + existingChildren = (List) storedCategory.getRelationshipAttribute(CATEGORY_CHILDREN); } Set existingChildrenGuids = existingChildren.stream().map(x -> x.getGuid()).collect(Collectors.toSet()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index 72003d9e90..068d92a4d9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.model.tasks.AtlasTask; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; @@ -45,47 +46,33 @@ import org.apache.atlas.tasks.TaskManagement; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; import static org.apache.atlas.type.Constants.*; @Component -public class TermPreProcessor implements PreProcessor { +public class TermPreProcessor extends AbstractGlossaryPreProcessor { private static final Logger LOG = LoggerFactory.getLogger(TermPreProcessor.class); - private static final String ATTR_MEANINGS = "meanings"; - - static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); - - - private final AtlasTypeRegistry typeRegistry; - private final EntityGraphRetriever entityRetriever; - private final TaskManagement taskManagement; - private EntityDiscoveryService discovery; - private AtlasEntityHeader anchor; public TermPreProcessor( AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, TaskManagement taskManagement) { - this.entityRetriever = entityRetriever; - this.typeRegistry = typeRegistry; - this.taskManagement = taskManagement; - try { - this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); - } catch (AtlasException e) { - e.printStackTrace(); - } + super(typeRegistry, entityRetriever, graph, taskManagement); } @Override @@ -119,10 +106,13 @@ private void processCreateTerm(AtlasEntity entity, AtlasVertex vertex) throws At throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } - if (termExists(termName)) { + String glossaryQName = (String) anchor.getAttribute(QUALIFIED_NAME); + if (termExists(termName, glossaryQName)) { throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); } + validateCategory(entity); + entity.setAttribute(QUALIFIED_NAME, createQualifiedName()); AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)), "create entity: type=", entity.getTypeName()); @@ -134,10 +124,7 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateTerm"); String termName = (String) entity.getAttribute(NAME); String vertexName = vertex.getProperty(NAME, String.class); - - if (!vertexName.equals(termName) && termExists(termName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); - } + String termGuid = GraphHelper.getGuid(vertex); if (StringUtils.isEmpty(termName) || isNameInvalid(termName)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); @@ -145,100 +132,113 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At AtlasEntity storeObject = entityRetriever.toAtlasEntity(vertex); AtlasRelatedObjectId existingAnchor = (AtlasRelatedObjectId) storeObject.getRelationshipAttribute(ANCHOR); + + String termQualifiedName = vertex.getProperty(QUALIFIED_NAME, String.class); + String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); + + validateCategory(entity); + if (existingAnchor != null && !existingAnchor.getGuid().equals(anchor.getGuid())){ - throw new AtlasBaseException(AtlasErrorCode.ACHOR_UPDATION_NOT_SUPPORTED); - } + String updatedTermQualifiedName = moveTermToAnotherGlossary(entity, vertex, glossaryQualifiedName, termName, termQualifiedName); - String vertexQName = vertex.getProperty(QUALIFIED_NAME, String.class); + if (checkEntityTermAssociation(termQualifiedName)) { + if (taskManagement != null && DEFERRED_ACTION_ENABLED) { + createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, vertexName, termName, termQualifiedName, updatedTermQualifiedName, vertex); + } else { + updateMeaningsAttributesInEntitiesOnTermUpdate(vertexName, termName, termQualifiedName, updatedTermQualifiedName, termGuid); + } + } - entity.setAttribute(QUALIFIED_NAME, vertexQName); + } else { - String termGuid = GraphHelper.getGuid(vertex); + if (!vertexName.equals(termName) && termExists(termName, glossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); + } + + entity.setAttribute(QUALIFIED_NAME, termQualifiedName); - if(!termName.equals(vertexName) && checkEntityTermAssociation(vertexQName)){ - if (taskManagement != null && DEFERRED_ACTION_ENABLED) { - createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, vertexName, termName, vertexQName, vertex); + if (!termName.equals(vertexName) && checkEntityTermAssociation(termQualifiedName)) { + if (taskManagement != null && DEFERRED_ACTION_ENABLED) { + createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, vertexName, termName, termQualifiedName, null, vertex); } else { - updateMeaningsNamesInEntitiesOnTermUpdate(vertexName, termName, vertexQName, termGuid); + updateMeaningsAttributesInEntitiesOnTermUpdate(vertexName, termName, termQualifiedName, null, termGuid); } + } } - RequestContext.get().endMetricRecord(metricRecorder); } - private boolean checkEntityTermAssociation(String termQName) throws AtlasBaseException{ - List entityHeader; - entityHeader = discovery.searchUsingTermQualifiedName(0,1,termQName,null,null); - Boolean hasEntityAssociation = entityHeader != null ? true : false; - return hasEntityAssociation; - } + private void validateCategory(AtlasEntity entity) throws AtlasBaseException { + String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); + if (entity.hasRelationshipAttribute(ATTR_CATEGORIES) && entity.getRelationshipAttribute(ATTR_CATEGORIES) != null) { + List categories = (List) entity.getRelationshipAttribute(ATTR_CATEGORIES); - public void updateMeaningsNamesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, String termQName, String termGuid) throws AtlasBaseException { - int from = 0; - Set attributes = new HashSet(){{ - add(ATTR_MEANINGS); - }}; - Set relationAttributes = new HashSet(){{ - add(STATE_PROPERTY_KEY); - add(NAME); - }}; - while (true) { - List entityHeaders = discovery.searchUsingTermQualifiedName(from, ELASTICSEARCH_PAGINATION_SIZE, - termQName,attributes,relationAttributes); - if (entityHeaders == null) - break; - for (AtlasEntityHeader entityHeader : entityHeaders) { - List meanings = (List) entityHeader.getAttribute(ATTR_MEANINGS); - - String updatedMeaningsText = meanings - .stream() - .filter(x -> AtlasEntity.Status.ACTIVE.name().equals(x.getAttributes().get(STATE_PROPERTY_KEY))) - .map(x -> x.getGuid().equals(termGuid) ? updatedTermName : x.getAttributes().get(NAME).toString()) - .collect(Collectors.joining(",")); - AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(entityHeader.getGuid()); - - AtlasGraphUtilsV2.setEncodedProperty(entityVertex, MEANINGS_TEXT_PROPERTY_KEY, updatedMeaningsText); - List meaningsNames = entityVertex.getMultiValuedProperty(MEANING_NAMES_PROPERTY_KEY, String.class); - - if(meaningsNames.contains(currentTermName)){ - AtlasGraphUtilsV2.removeItemFromListPropertyValue(entityVertex, MEANING_NAMES_PROPERTY_KEY, currentTermName); - AtlasGraphUtilsV2.addListProperty(entityVertex, MEANING_NAMES_PROPERTY_KEY, updatedTermName, true); + if (CollectionUtils.isNotEmpty(categories)) { + AtlasObjectId category = categories.get(0); + String categoryQualifiedName; + + if (category.getUniqueAttributes() != null && category.getUniqueAttributes().containsKey(QUALIFIED_NAME)) { + categoryQualifiedName = (String) category.getUniqueAttributes().get(QUALIFIED_NAME); + } else { + AtlasVertex categoryVertex = entityRetriever.getEntityVertex(category.getGuid()); + categoryQualifiedName = categoryVertex.getProperty(QUALIFIED_NAME, String.class); + } + + if (!categoryQualifiedName.endsWith(glossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Passed category doesn't belongs to Passed Glossary"); } } - from += ELASTICSEARCH_PAGINATION_SIZE; + } + } - if (entityHeaders.size() < ELASTICSEARCH_PAGINATION_SIZE) - break; + public String moveTermToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex, + String targetGlossaryQualifiedName, + String newTermName, + String currentTermQualifiedName) throws AtlasBaseException { + + //check duplicate term name + if (termExists(newTermName, targetGlossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, newTermName); } - } + //qualifiedName, __u_qualifiedName + String sourceGlossaryQualifiedName = currentTermQualifiedName.split("@")[1]; - public void createAndQueueTask(String taskType,String currentTermName, String updatedTermName, String termQName, AtlasVertex termVertex) { - String termGuid = GraphHelper.getGuid(termVertex); - String currentUser = RequestContext.getCurrentUser(); - Map taskParams = MeaningsTask.toParameters(currentTermName, updatedTermName, termQName, termGuid); - AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams); + String updatedQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); - AtlasGraphUtilsV2.addEncodedProperty(termVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); + entity.setAttribute(QUALIFIED_NAME, updatedQualifiedName); - RequestContext.get().queueTask(task); - } + // __glossary + entity.setAttribute(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); - private String createQualifiedName() { - return getUUID() + "@" + anchor.getAttribute(QUALIFIED_NAME); - } + // __categories + /* check whether category is passed in relationshipAttributes + -- if it is not passed, extract it from store + if category does not belong to target glossary, throw an exception + */ + if (!entity.hasRelationshipAttribute(ATTR_CATEGORIES)) { + Iterator categoriesItr = getActiveParents(vertex, CATEGORY_TERMS_EDGE_LABEL); - private boolean termExists(String termName) { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("termExists"); - boolean ret = false; - String glossaryQName = (String) anchor.getAttribute(QUALIFIED_NAME); + if (categoriesItr.hasNext()) { + AtlasVertex categoryVertex = categoriesItr.next(); - ret = AtlasGraphUtilsV2.termExists(termName, glossaryQName); + String categoryQualifiedName = categoryVertex.getProperty(QUALIFIED_NAME, String.class); - RequestContext.get().endMetricRecord(metricRecorder); - return ret; + LOG.info("categoryQualifiedName {}, targetGlossaryQualifiedName {}", categoryQualifiedName, targetGlossaryQualifiedName); + + if (!categoryQualifiedName.endsWith(targetGlossaryQualifiedName)) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Passed category doesn't belongs to Passed Glossary"); + } + } + } + + return updatedQualifiedName; + } + + private String createQualifiedName() { + return getUUID() + "@" + anchor.getAttribute(QUALIFIED_NAME); } private void setAnchor(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java index ecda5c516f..42b2bf76d7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTask.java @@ -24,6 +24,7 @@ public abstract class MeaningsTask extends AbstractTask { private static final Logger LOG = LoggerFactory.getLogger(MeaningsTask.class); protected static final String PARAM_ENTITY_GUID = "entityGuid"; protected static final String PARAM_ENTITY_QUALIFIED_NAME = "entityQName"; + protected static final String PARAM_ENTITY_UPDATED_QUALIFIED_NAME = "updatedEntityQName"; protected static final String PARAM_UPDATED_TERM_NAME = "updatedTermName"; protected static final String PARAM_CURRENT_TERM_NAME = "currentTermName"; @@ -80,10 +81,11 @@ public AtlasTask.Status perform() throws Exception { } } - public static Map toParameters(String currentTerm, String updateTerm, String termQName, String termGuid) { + public static Map toParameters(String currentTerm, String updateTerm, String termQName, String updatedTermQName, String termGuid) { return new HashMap() {{ put(PARAM_ENTITY_GUID, termGuid); put(PARAM_ENTITY_QUALIFIED_NAME, termQName); + put(PARAM_ENTITY_UPDATED_QUALIFIED_NAME, updatedTermQName); put(PARAM_CURRENT_TERM_NAME, currentTerm); put(PARAM_UPDATED_TERM_NAME, updateTerm); }}; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTasks.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTasks.java index 28a8f6033c..163748aba5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTasks.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/MeaningsTasks.java @@ -19,10 +19,11 @@ public Update(AtlasTask task, EntityGraphMapper entityGraphMapper, AtlasGraph g protected void run(Map parameters) throws AtlasBaseException { String termGuid = (String) parameters.get(PARAM_ENTITY_GUID); String termQName = (String) parameters.get(PARAM_ENTITY_QUALIFIED_NAME); + String updatedTermQName = (String) parameters.get(PARAM_ENTITY_UPDATED_QUALIFIED_NAME); String currentTermName = (String) parameters.get(PARAM_CURRENT_TERM_NAME); String updatedTermName = (String) parameters.get(PARAM_UPDATED_TERM_NAME); - preprocessor.updateMeaningsNamesInEntitiesOnTermUpdate(currentTermName, updatedTermName, termQName, termGuid); + preprocessor.updateMeaningsAttributesInEntitiesOnTermUpdate(currentTermName, updatedTermName, termQName, updatedTermQName, termGuid); } } From 3e103145c6902fe4cdb6a23bfb033f64c57e764a Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Sat, 22 Jul 2023 18:39:40 +0530 Subject: [PATCH 02/10] GOV-967 Refactorings --- .../AbstractGlossaryPreProcessor.java | 21 ++++-- .../glossary/CategoryPreProcessor.java | 68 +++++++++---------- .../glossary/TermPreProcessor.java | 49 ++++--------- 3 files changed, 59 insertions(+), 79 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index ade0665551..5dc9e9302a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -74,14 +74,19 @@ public abstract class AbstractGlossaryPreProcessor implements PreProcessor { } } - public boolean termExists(String termName, String glossaryQName) { + public void termExists(String termName, String glossaryQName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("termExists"); boolean ret = false; - ret = AtlasGraphUtilsV2.termExists(termName, glossaryQName); + try { + ret = AtlasGraphUtilsV2.termExists(termName, glossaryQName); - RequestContext.get().endMetricRecord(metricRecorder); - return ret; + if (ret) { + throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } } public void createAndQueueTask(String taskType, @@ -137,7 +142,9 @@ public List indexSearchPaginated(Map dsl) thr return ret; } - public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, String termQName, String updatedTermQName, String termGuid) throws AtlasBaseException { + public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermName, String updatedTermName, + String termQName, String updatedTermQName, + String termGuid) throws AtlasBaseException { Set attributes = new HashSet(){{ add(ATTR_MEANINGS); }}; @@ -177,8 +184,8 @@ public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermNam } if (StringUtils.isNotEmpty(updatedTermQName) && !termQName.equals(updatedTermQName)) { - AtlasGraphUtilsV2.removeItemFromListPropertyValue(entityVertex, MEANINGS_PROPERTY_KEY, updatedTermQName); - AtlasGraphUtilsV2.addListProperty(entityVertex, MEANINGS_PROPERTY_KEY, updatedTermQName, true); + AtlasGraphUtilsV2.removeItemFromListPropertyValue(entityVertex, MEANINGS_PROPERTY_KEY, termQName); + AtlasGraphUtilsV2.addEncodedProperty(entityVertex, MEANINGS_PROPERTY_KEY, updatedTermQName); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index a652562fea..8d7464421f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -24,15 +24,12 @@ import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; @@ -48,6 +45,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,22 +53,18 @@ import java.util.stream.Collectors; import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.CATEGORY_PARENT_EDGE_LABEL; import static org.apache.atlas.repository.Constants.CATEGORY_TERMS_EDGE_LABEL; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.CATEGORIES_PARENT_PROPERTY_KEY; import static org.apache.atlas.type.Constants.CATEGORIES_PROPERTY_KEY; import static org.apache.atlas.type.Constants.GLOSSARY_PROPERTY_KEY; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.both; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.from; public class CategoryPreProcessor extends AbstractGlossaryPreProcessor { private static final Logger LOG = LoggerFactory.getLogger(CategoryPreProcessor.class); @@ -115,7 +109,6 @@ private void processCreateCategory(AtlasEntity entity, AtlasVertex vertex) throw throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } - //check duplicate category name String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); categoryExists(catName, glossaryQualifiedName); validateParent(glossaryQualifiedName); @@ -172,13 +165,11 @@ private void processMoveCategoryToAnotherGlossary(AtlasEntity category, AtlasObjectId targetGlossary = (AtlasObjectId) category.getRelationshipAttribute(ANCHOR); String targetGlossaryQualifiedName; - AtlasVertex targetGlossaryVertex; if (targetGlossary.getUniqueAttributes() != null && targetGlossary.getUniqueAttributes().containsKey(QUALIFIED_NAME)) { - targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary); targetGlossaryQualifiedName = (String) targetGlossary.getUniqueAttributes().get(QUALIFIED_NAME); } else { - targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary.getGuid()); + AtlasVertex targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary.getGuid()); targetGlossaryQualifiedName = targetGlossaryVertex.getProperty(QUALIFIED_NAME, String.class); } @@ -216,7 +207,7 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, String currentCategoryQualifiedName = childCategoryVertex.getProperty(QUALIFIED_NAME, String.class); String updatedQualifiedName = currentCategoryQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); - // Change cat Qname + // Change category qualifiedName childCategoryVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName); //change __glossary, __parentCategory @@ -231,7 +222,7 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, moveChildTermToAnotherGlossary(termVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); } - // Get all children categories of category + // Get all children categories of current category Iterator childCategories = getActiveChildren(childCategoryVertex, CATEGORY_PARENT_EDGE_LABEL); while (childCategories.hasNext()) { @@ -253,23 +244,22 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, try { String termName = termVertex.getProperty(NAME, String.class); - String termGuid = GraphHelper.getGuid(termVertex); + String termGuid = termVertex.getProperty(GUID_PROPERTY_KEY, String.class); LOG.info("Moving child term {} to Glossary {}", termName, targetGlossaryQualifiedName); //check duplicate term name - if (termExists(termName, targetGlossaryQualifiedName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); - } + termExists(termName, targetGlossaryQualifiedName); - //qualifiedName String currentTermQualifiedName = termVertex.getProperty(QUALIFIED_NAME, String.class); String updatedTermQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + //qualifiedName termVertex.setProperty(QUALIFIED_NAME, updatedTermQualifiedName); // __glossary, __categories termVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); + termVertex.removeProperty(CATEGORIES_PROPERTY_KEY); termVertex.setProperty(CATEGORIES_PROPERTY_KEY, parentCategoryQualifiedName); if (checkEntityTermAssociation(currentTermQualifiedName)) { @@ -312,8 +302,6 @@ private void categoryExists(String categoryName, String glossaryQualifiedName) t boolean exists = false; try { - Map dsl = mapOf("from", 0); - List mustClauseList = new ArrayList(); mustClauseList.add(mapOf("term", mapOf("__glossary", glossaryQualifiedName))); mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE))); @@ -332,7 +320,8 @@ private void categoryExists(String categoryName, String glossaryQualifiedName) t } bool.put("must", mustClauseList); - dsl.put("query", mapOf("bool", bool)); + + Map dsl = mapOf("query", mapOf("bool", bool)); List categories = indexSearchPaginated(dsl); @@ -370,27 +359,36 @@ private void validateChildren(AtlasEntity entity, AtlasEntity storedCategory) th AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("CategoryPreProcessor.validateChildren"); // in case new child is being added, ensure it belongs to same Glossary - List existingChildren = new ArrayList<>(); - if (storedCategory != null) { - existingChildren = (List) storedCategory.getRelationshipAttribute(CATEGORY_CHILDREN); - } - Set existingChildrenGuids = existingChildren.stream().map(x -> x.getGuid()).collect(Collectors.toSet()); + try { + if (entity.hasRelationshipAttribute(CATEGORY_CHILDREN) && entity.getRelationshipAttribute(CATEGORY_CHILDREN) != null) { + List children = (List) entity.getRelationshipAttribute(CATEGORY_CHILDREN); - List children = (List) entity.getRelationshipAttribute(CATEGORY_CHILDREN); + if (CollectionUtils.isNotEmpty(children)) { + Set existingChildrenGuids = new HashSet<>(); - if (CollectionUtils.isNotEmpty(children)) { - for (AtlasObjectId child : children) { - if (!existingChildrenGuids.contains(child.getGuid())) { - AtlasEntity newChild = entityRetriever.toAtlasEntity(child.getGuid()); - AtlasRelatedObjectId newAnchor = (AtlasRelatedObjectId) newChild.getRelationshipAttribute(ANCHOR); + if (storedCategory != null && + storedCategory.hasRelationshipAttribute(CATEGORY_CHILDREN) && + storedCategory.getRelationshipAttribute(CATEGORY_CHILDREN) != null) { + List existingChildren = (List) storedCategory.getRelationshipAttribute(CATEGORY_CHILDREN); - if (newAnchor != null && !newAnchor.getGuid().equals(anchor.getGuid())){ - throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); + existingChildrenGuids = existingChildren.stream().map(x -> x.getGuid()).collect(Collectors.toSet()); + } + + for (AtlasObjectId child : children) { + if (!existingChildrenGuids.contains(child.getGuid())) { + AtlasEntity newChild = entityRetriever.toAtlasEntity(child.getGuid()); + AtlasRelatedObjectId newAnchor = (AtlasRelatedObjectId) newChild.getRelationshipAttribute(ANCHOR); + + if (newAnchor != null && !newAnchor.getGuid().equals(anchor.getGuid())){ + throw new AtlasBaseException(AtlasErrorCode.CATEGORY_PARENT_FROM_OTHER_GLOSSARY); + } + } } } } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); } - RequestContext.get().endMetricRecord(metricRecorder); } private void setAnchorAndParent(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index 068d92a4d9..73533cf944 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -18,14 +18,11 @@ package org.apache.atlas.repository.store.graph.v2.preprocessor.glossary; -import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; import org.apache.atlas.authorize.AtlasPrivilege; -import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -33,16 +30,10 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.model.tasks.AtlasTask; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; -import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; -import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask; import org.apache.atlas.tasks.TaskManagement; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; @@ -52,16 +43,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; import static org.apache.atlas.type.Constants.*; @@ -107,9 +93,8 @@ private void processCreateTerm(AtlasEntity entity, AtlasVertex vertex) throws At } String glossaryQName = (String) anchor.getAttribute(QUALIFIED_NAME); - if (termExists(termName, glossaryQName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); - } + + termExists(termName, glossaryQName); validateCategory(entity); @@ -124,22 +109,22 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateTerm"); String termName = (String) entity.getAttribute(NAME); String vertexName = vertex.getProperty(NAME, String.class); - String termGuid = GraphHelper.getGuid(vertex); + String termGuid = vertex.getProperty(GUID_PROPERTY_KEY, String.class); if (StringUtils.isEmpty(termName) || isNameInvalid(termName)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); } + validateCategory(entity); + AtlasEntity storeObject = entityRetriever.toAtlasEntity(vertex); AtlasRelatedObjectId existingAnchor = (AtlasRelatedObjectId) storeObject.getRelationshipAttribute(ANCHOR); String termQualifiedName = vertex.getProperty(QUALIFIED_NAME, String.class); String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); - validateCategory(entity); - if (existingAnchor != null && !existingAnchor.getGuid().equals(anchor.getGuid())){ - String updatedTermQualifiedName = moveTermToAnotherGlossary(entity, vertex, glossaryQualifiedName, termName, termQualifiedName); + String updatedTermQualifiedName = moveTermToAnotherGlossary(entity, vertex, glossaryQualifiedName, termQualifiedName); if (checkEntityTermAssociation(termQualifiedName)) { if (taskManagement != null && DEFERRED_ACTION_ENABLED) { @@ -151,8 +136,8 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At } else { - if (!vertexName.equals(termName) && termExists(termName, glossaryQualifiedName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); + if (!vertexName.equals(termName)) { + termExists(termName, glossaryQualifiedName); } entity.setAttribute(QUALIFIED_NAME, termQualifiedName); @@ -195,28 +180,20 @@ private void validateCategory(AtlasEntity entity) throws AtlasBaseException { public String moveTermToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex, String targetGlossaryQualifiedName, - String newTermName, String currentTermQualifiedName) throws AtlasBaseException { //check duplicate term name - if (termExists(newTermName, targetGlossaryQualifiedName)) { - throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, newTermName); - } + termExists((String) entity.getAttribute(NAME), targetGlossaryQualifiedName); - //qualifiedName, __u_qualifiedName String sourceGlossaryQualifiedName = currentTermQualifiedName.split("@")[1]; - String updatedQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + //qualifiedName entity.setAttribute(QUALIFIED_NAME, updatedQualifiedName); - // __glossary - entity.setAttribute(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); - // __categories - /* check whether category is passed in relationshipAttributes - -- if it is not passed, extract it from store - if category does not belong to target glossary, throw an exception + /* if category is not passed in relationshipAttributes, check + whether category belongs to target glossary, if not throw an exception */ if (!entity.hasRelationshipAttribute(ATTR_CATEGORIES)) { Iterator categoriesItr = getActiveParents(vertex, CATEGORY_TERMS_EDGE_LABEL); @@ -226,8 +203,6 @@ public String moveTermToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex, String categoryQualifiedName = categoryVertex.getProperty(QUALIFIED_NAME, String.class); - LOG.info("categoryQualifiedName {}, targetGlossaryQualifiedName {}", categoryQualifiedName, targetGlossaryQualifiedName); - if (!categoryQualifiedName.endsWith(targetGlossaryQualifiedName)) { throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Passed category doesn't belongs to Passed Glossary"); } From bf1b020a1a9cf9f592a96c35899b3bfaaa3facd8 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Sat, 22 Jul 2023 20:54:21 +0530 Subject: [PATCH 03/10] GOV-967 Support Audits, Notification, Auth check --- .../AbstractGlossaryPreProcessor.java | 63 +++++++++++++++++++ .../glossary/CategoryPreProcessor.java | 29 +++++++-- .../glossary/TermPreProcessor.java | 21 ++++--- 3 files changed, 102 insertions(+), 11 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 5dc9e9302a..04a3441975 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -4,6 +4,9 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasEntityAccessRequest; +import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; @@ -21,6 +24,7 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; import org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTask; import org.apache.atlas.tasks.TaskManagement; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -40,6 +44,7 @@ import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; +import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; @@ -236,4 +241,62 @@ protected Iterator getEdges(AtlasVertex vertex, String childrenEdgeLabel, AtlasE } } + protected void isAuthorized(AtlasEntityHeader sourceGlossary, AtlasEntityHeader targetGlossary) throws AtlasBaseException { + + // source -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, sourceGlossary), + "create on source Glossary: ", sourceGlossary.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, sourceGlossary), + "update on source Glossary: ", sourceGlossary.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, sourceGlossary), + "delete on source Glossary: ", sourceGlossary.getAttribute(NAME)); + + + // target -> CREATE + UPDATE + DELETE + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, targetGlossary), + "create on source Glossary: ", targetGlossary.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, targetGlossary), + "update on source Glossary: ", targetGlossary.getAttribute(NAME)); + + AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, targetGlossary), + "delete on source Glossary: ", targetGlossary.getAttribute(NAME)); + } + + /** + * Record the updated child entities, it will be used to send notification and store audit logs + * @param entityVertex Child entity vertex + * @param updatedAttributes Updated attributes while updating required attributes on updating collection + */ + protected void recordUpdatedChildEntities(AtlasVertex entityVertex, Map updatedAttributes) { + RequestContext requestContext = RequestContext.get(); + AtlasPerfMetrics.MetricRecorder metricRecorder = requestContext.startMetricRecord("recordUpdatedChildEntities"); + AtlasEntity entity = new AtlasEntity(); + entity = entityRetriever.mapSystemAttributes(entityVertex, entity); + entity.setAttributes(updatedAttributes); + requestContext.cacheDifferentialEntity(new AtlasEntity(entity)); + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + //Add the min info attributes to entity header to be sent as part of notification + if(entityType != null) { + AtlasEntity finalEntity = entity; + entityType.getMinInfoAttributes().values().stream().filter(attribute -> !updatedAttributes.containsKey(attribute.getName())).forEach(attribute -> { + Object attrValue = null; + try { + attrValue = entityRetriever.getVertexAttribute(entityVertex, attribute); + } catch (AtlasBaseException e) { + LOG.error("Error while getting vertex attribute", e); + } + if(attrValue != null) { + finalEntity.setAttribute(attribute.getName(), attrValue); + } + }); + requestContext.recordEntityUpdate(new AtlasEntityHeader(finalEntity)); + } + + requestContext.endMetricRecord(metricRecorder); + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index 8d7464421f..1a6e077fbe 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; @@ -133,13 +134,16 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw AtlasEntity storedCategory = entityRetriever.toAtlasEntity(vertex); AtlasRelatedObjectId currentGlossary = (AtlasRelatedObjectId) storedCategory.getRelationshipAttribute(ANCHOR); - AtlasVertex currentGlossaryVertex = entityRetriever.getEntityVertex(currentGlossary.getGuid()); - String currentGlossaryQualifiedName = currentGlossaryVertex.getProperty(QUALIFIED_NAME, String.class); + AtlasEntityHeader currentGlossaryHeader = entityRetriever.toAtlasEntityHeader(currentGlossary.getGuid()); + String currentGlossaryQualifiedName = (String) currentGlossaryHeader.getAttribute(QUALIFIED_NAME); String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){ - processMoveCategoryToAnotherGlossary(entity, vertex, vertexQnName); + //Auth check + isAuthorized(currentGlossaryHeader, anchor); + + processMoveCategoryToAnotherGlossary(entity, vertex, currentGlossaryQualifiedName, vertexQnName); } else { categoryExists(catName, newGlossaryQualifiedName); @@ -154,6 +158,7 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw private void processMoveCategoryToAnotherGlossary(AtlasEntity category, AtlasVertex categoryVertex, + String sourceGlossaryQualifiedName, String currentCategoryQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveCategoryToAnotherGlossary"); @@ -180,7 +185,6 @@ private void processMoveCategoryToAnotherGlossary(AtlasEntity category, categoryExists(categoryName , targetGlossaryQualifiedName); validateParentForGlossaryChange(category, categoryVertex, targetGlossaryQualifiedName); - String sourceGlossaryQualifiedName = currentCategoryQualifiedName.split("@")[1]; String updatedQualifiedName = currentCategoryQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); category.setAttribute(QUALIFIED_NAME, updatedQualifiedName); @@ -203,17 +207,23 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, try { LOG.info("Moving child category {} to Glossary {}", childCategoryVertex.getProperty(NAME, String.class), targetGlossaryQualifiedName); + Map updatedAttributes = new HashMap<>(); String currentCategoryQualifiedName = childCategoryVertex.getProperty(QUALIFIED_NAME, String.class); String updatedQualifiedName = currentCategoryQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); // Change category qualifiedName childCategoryVertex.setProperty(QUALIFIED_NAME, updatedQualifiedName); + updatedAttributes.put(QUALIFIED_NAME, updatedQualifiedName); //change __glossary, __parentCategory childCategoryVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); childCategoryVertex.setProperty(CATEGORIES_PARENT_PROPERTY_KEY, parentCategoryQualifiedName); + //update system properties + GraphHelper.setModifiedByAsString(childCategoryVertex, RequestContext.get().getUser()); + GraphHelper.setModifiedTime(childCategoryVertex, System.currentTimeMillis()); + // move terms to target Glossary Iterator terms = getActiveChildren(childCategoryVertex, CATEGORY_TERMS_EDGE_LABEL); @@ -230,6 +240,8 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, moveChildrenToAnotherGlossary(childVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); } + recordUpdatedChildEntities(childCategoryVertex, updatedAttributes); + LOG.info("Moved child category {} to Glossary {}", childCategoryVertex.getProperty(NAME, String.class), targetGlossaryQualifiedName); } finally { RequestContext.get().endMetricRecord(recorder); @@ -243,6 +255,8 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("moveChildTermToAnotherGlossary"); try { + Map updatedAttributes = new HashMap<>(); + String termName = termVertex.getProperty(NAME, String.class); String termGuid = termVertex.getProperty(GUID_PROPERTY_KEY, String.class); @@ -256,12 +270,17 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, //qualifiedName termVertex.setProperty(QUALIFIED_NAME, updatedTermQualifiedName); + updatedAttributes.put(QUALIFIED_NAME, updatedTermQualifiedName); // __glossary, __categories termVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); termVertex.removeProperty(CATEGORIES_PROPERTY_KEY); termVertex.setProperty(CATEGORIES_PROPERTY_KEY, parentCategoryQualifiedName); + //update system properties + GraphHelper.setModifiedByAsString(termVertex, RequestContext.get().getUser()); + GraphHelper.setModifiedTime(termVertex, System.currentTimeMillis()); + if (checkEntityTermAssociation(currentTermQualifiedName)) { if (taskManagement != null && DEFERRED_ACTION_ENABLED) { createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, termName, termName, currentTermQualifiedName, updatedTermQualifiedName, termVertex); @@ -270,6 +289,8 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, } } + recordUpdatedChildEntities(termVertex, updatedAttributes); + LOG.info("Moved child term {} to Glossary {}", termName, targetGlossaryQualifiedName); } finally { RequestContext.get().endMetricRecord(recorder); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index 73533cf944..df81586ad3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -117,14 +117,20 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At validateCategory(entity); - AtlasEntity storeObject = entityRetriever.toAtlasEntity(vertex); - AtlasRelatedObjectId existingAnchor = (AtlasRelatedObjectId) storeObject.getRelationshipAttribute(ANCHOR); + AtlasEntity storedTerm = entityRetriever.toAtlasEntity(vertex); + AtlasRelatedObjectId currentGlossary = (AtlasRelatedObjectId) storedTerm.getRelationshipAttribute(ANCHOR); + AtlasEntityHeader currentGlossaryHeader = entityRetriever.toAtlasEntityHeader(currentGlossary.getGuid()); + String currentGlossaryQualifiedName = (String) currentGlossaryHeader.getAttribute(QUALIFIED_NAME); String termQualifiedName = vertex.getProperty(QUALIFIED_NAME, String.class); - String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); - if (existingAnchor != null && !existingAnchor.getGuid().equals(anchor.getGuid())){ - String updatedTermQualifiedName = moveTermToAnotherGlossary(entity, vertex, glossaryQualifiedName, termQualifiedName); + String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME); + + if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){ + //Auth check + isAuthorized(currentGlossaryHeader, anchor); + + String updatedTermQualifiedName = moveTermToAnotherGlossary(entity, vertex, currentGlossaryQualifiedName, newGlossaryQualifiedName, termQualifiedName); if (checkEntityTermAssociation(termQualifiedName)) { if (taskManagement != null && DEFERRED_ACTION_ENABLED) { @@ -137,7 +143,7 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At } else { if (!vertexName.equals(termName)) { - termExists(termName, glossaryQualifiedName); + termExists(termName, newGlossaryQualifiedName); } entity.setAttribute(QUALIFIED_NAME, termQualifiedName); @@ -179,13 +185,14 @@ private void validateCategory(AtlasEntity entity) throws AtlasBaseException { } public String moveTermToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex, + String sourceGlossaryQualifiedName, String targetGlossaryQualifiedName, String currentTermQualifiedName) throws AtlasBaseException { //check duplicate term name termExists((String) entity.getAttribute(NAME), targetGlossaryQualifiedName); - String sourceGlossaryQualifiedName = currentTermQualifiedName.split("@")[1]; + String updatedQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName); //qualifiedName From c6c4887414a868f7b018064181f3ba608c9b258e Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Mon, 24 Jul 2023 10:33:46 +0530 Subject: [PATCH 04/10] GOV-967 Refacorings --- .../AbstractGlossaryPreProcessor.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 04a3441975..e4d1f1f19f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.atlas.repository.store.graph.v2.preprocessor.glossary; import org.apache.atlas.AtlasConfiguration; @@ -9,7 +26,6 @@ import org.apache.atlas.authorize.AtlasPrivilege; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.IndexSearchParams; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; @@ -33,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -44,14 +59,12 @@ import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; -import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; import static org.apache.atlas.type.Constants.MEANINGS_TEXT_PROPERTY_KEY; import static org.apache.atlas.type.Constants.MEANING_NAMES_PROPERTY_KEY; import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal.Symbols.from; public abstract class AbstractGlossaryPreProcessor implements PreProcessor { private static final Logger LOG = LoggerFactory.getLogger(AbstractGlossaryPreProcessor.class); @@ -125,7 +138,7 @@ public List indexSearchPaginated(Map dsl) thr dsl.put("sort", sortList); int from = 0; - int size = 1; + int size = 100; boolean hasMore = true; do { dsl.put("from", from); From a6c3cc5fcdc4a1feb97c68fe35cff3f59ee6f21b Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Mon, 24 Jul 2023 18:53:35 +0530 Subject: [PATCH 05/10] GOV-967 Fix Move back to original glossary failure --- .../AbstractGlossaryPreProcessor.java | 24 ++++++++++++++++++- .../glossary/TermPreProcessor.java | 2 +- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index e4d1f1f19f..c8cbbd4a54 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -57,8 +58,11 @@ import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; +import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE; +import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_TERM_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; +import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; @@ -97,7 +101,25 @@ public void termExists(String termName, String glossaryQName) throws AtlasBaseEx boolean ret = false; try { - ret = AtlasGraphUtilsV2.termExists(termName, glossaryQName); + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__glossary", glossaryQName))); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", ATLAS_GLOSSARY_TERM_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf("name.keyword", termName))); + + Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); + + List terms = indexSearchPaginated(dsl); + + if (CollectionUtils.isNotEmpty(terms)) { + for (AtlasEntityHeader term : terms) { + String name = (String) term.getAttribute(NAME); + if (termName.equals(name)) { + ret = true; + break; + } + } + } if (ret) { throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index df81586ad3..d1ccc0c278 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -109,7 +109,7 @@ private void processUpdateTerm(AtlasEntity entity, AtlasVertex vertex) throws At AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateTerm"); String termName = (String) entity.getAttribute(NAME); String vertexName = vertex.getProperty(NAME, String.class); - String termGuid = vertex.getProperty(GUID_PROPERTY_KEY, String.class); + String termGuid = entity.getGuid(); if (StringUtils.isEmpty(termName) || isNameInvalid(termName)) { throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); From 804bfc4f9c71a2e95fb2e27f5ff3c796e6d272cd Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Tue, 25 Jul 2023 13:35:44 +0530 Subject: [PATCH 06/10] GOV-967 Fix children glossary relation while moving category --- .../store/graph/v2/AtlasEntityStoreV2.java | 2 +- .../store/graph/v2/EntityGraphMapper.java | 7 ++++ .../v2/preprocessor/PreProcessorUtils.java | 2 + .../AbstractGlossaryPreProcessor.java | 3 -- .../glossary/CategoryPreProcessor.java | 41 +++++++++++++------ .../glossary/TermPreProcessor.java | 2 - 6 files changed, 39 insertions(+), 18 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 4f715b37f4..2c5aff563e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -1699,7 +1699,7 @@ public PreProcessor getPreProcessor(String typeName) { break; case ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE: - preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement); + preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper); break; case QUERY_ENTITY_TYPE: diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 10a46f78b5..d030780fc4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -36,6 +36,7 @@ import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.model.typedef.AtlasEntityDef; @@ -543,6 +544,12 @@ public void setCustomAttributes(AtlasVertex vertex, AtlasEntity entity) { } } + public void mapGlossaryRelationshipAttribute(AtlasAttribute attribute, AtlasObjectId glossaryObjectId, + AtlasVertex entityVertex, EntityMutationContext context) throws AtlasBaseException { + + mapAttribute(attribute, glossaryObjectId, entityVertex, EntityMutations.EntityOperation.UPDATE, context); + } + public void setLabels(AtlasVertex vertex, Set labels) throws AtlasBaseException { final Set currentLabels = getLabels(vertex); final Set addedLabels; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java index 657c79ad7b..2803562e2b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/PreProcessorUtils.java @@ -29,6 +29,8 @@ public class PreProcessorUtils { public static final String CATEGORY_TERMS = "terms"; public static final String CATEGORY_PARENT = "parentCategory"; public static final String CATEGORY_CHILDREN = "childrenCategories"; + public static final String GLOSSARY_TERM_REL_TYPE = "AtlasGlossaryTermAnchor"; + public static final String GLOSSARY_CATEGORY_REL_TYPE = "AtlasGlossaryCategoryAnchor"; //Query models constants public static final String PREFIX_QUERY_QN = "default/collection/"; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index c8cbbd4a54..333f52c7ae 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -58,11 +57,9 @@ import java.util.stream.Collectors; import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; -import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_TERM_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; -import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; import static org.apache.atlas.type.Constants.MEANINGS_PROPERTY_KEY; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index 1a6e077fbe..72bef6081f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -33,9 +33,12 @@ import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; import org.apache.atlas.tasks.TaskManagement; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; @@ -55,11 +58,13 @@ import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST; import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE; +import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.CATEGORY_PARENT_EDGE_LABEL; import static org.apache.atlas.repository.Constants.CATEGORY_TERMS_EDGE_LABEL; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; @@ -72,10 +77,13 @@ public class CategoryPreProcessor extends AbstractGlossaryPreProcessor { private AtlasEntityHeader anchor; private AtlasEntityHeader parentCategory; + private EntityGraphMapper entityGraphMapper; + private EntityMutationContext context; public CategoryPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, - AtlasGraph graph, TaskManagement taskManagement) { + AtlasGraph graph, TaskManagement taskManagement, EntityGraphMapper entityGraphMapper) { super(typeRegistry, entityRetriever, graph, taskManagement); + this.entityGraphMapper = entityGraphMapper; } @Override @@ -87,6 +95,8 @@ public void processAttributes(AtlasStruct entityStruct, EntityMutationContext co entityStruct.getAttribute(QUALIFIED_NAME), operation); } + this.context = context; + AtlasEntity entity = (AtlasEntity) entityStruct; AtlasVertex vertex = context.getVertex(entity.getGuid()); @@ -143,7 +153,7 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw //Auth check isAuthorized(currentGlossaryHeader, anchor); - processMoveCategoryToAnotherGlossary(entity, vertex, currentGlossaryQualifiedName, vertexQnName); + processMoveCategoryToAnotherGlossary(entity, vertex, currentGlossaryQualifiedName, newGlossaryQualifiedName, vertexQnName); } else { categoryExists(catName, newGlossaryQualifiedName); @@ -159,6 +169,7 @@ private void processUpdateCategory(AtlasEntity entity, AtlasVertex vertex) throw private void processMoveCategoryToAnotherGlossary(AtlasEntity category, AtlasVertex categoryVertex, String sourceGlossaryQualifiedName, + String targetGlossaryQualifiedName, String currentCategoryQualifiedName) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder recorder = RequestContext.get().startMetricRecord("processMoveCategoryToAnotherGlossary"); @@ -168,16 +179,6 @@ private void processMoveCategoryToAnotherGlossary(AtlasEntity category, CATEGORY_CHILDREN, CATEGORY_TERMS)); } - AtlasObjectId targetGlossary = (AtlasObjectId) category.getRelationshipAttribute(ANCHOR); - String targetGlossaryQualifiedName; - - if (targetGlossary.getUniqueAttributes() != null && targetGlossary.getUniqueAttributes().containsKey(QUALIFIED_NAME)) { - targetGlossaryQualifiedName = (String) targetGlossary.getUniqueAttributes().get(QUALIFIED_NAME); - } else { - AtlasVertex targetGlossaryVertex = entityRetriever.getEntityVertex(targetGlossary.getGuid()); - targetGlossaryQualifiedName = targetGlossaryVertex.getProperty(QUALIFIED_NAME, String.class); - } - String categoryName = (String) category.getAttribute(NAME); LOG.info("Moving category {} to Glossary {}", categoryName, targetGlossaryQualifiedName); @@ -220,6 +221,9 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, childCategoryVertex.setProperty(GLOSSARY_PROPERTY_KEY, targetGlossaryQualifiedName); childCategoryVertex.setProperty(CATEGORIES_PARENT_PROPERTY_KEY, parentCategoryQualifiedName); + // update glossary relationship + updateGlossaryRelationship(childCategoryVertex, GLOSSARY_CATEGORY_REL_TYPE); + //update system properties GraphHelper.setModifiedByAsString(childCategoryVertex, RequestContext.get().getUser()); GraphHelper.setModifiedTime(childCategoryVertex, System.currentTimeMillis()); @@ -277,6 +281,9 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex, termVertex.removeProperty(CATEGORIES_PROPERTY_KEY); termVertex.setProperty(CATEGORIES_PROPERTY_KEY, parentCategoryQualifiedName); + // update glossary relationship + updateGlossaryRelationship(termVertex, GLOSSARY_TERM_REL_TYPE); + //update system properties GraphHelper.setModifiedByAsString(termVertex, RequestContext.get().getUser()); GraphHelper.setModifiedTime(termVertex, System.currentTimeMillis()); @@ -456,6 +463,16 @@ private void setAnchorAndParent(AtlasEntity entity, EntityMutationContext contex RequestContext.get().endMetricRecord(metricRecorder); } + private void updateGlossaryRelationship(AtlasVertex entityVertex, String relationshipType) throws AtlasBaseException { + AtlasObjectId glossaryObjectId = new AtlasObjectId(anchor.getGuid(), ATLAS_GLOSSARY_ENTITY_TYPE); + + String typeName = getTypeName(entityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + AtlasStructType.AtlasAttribute attribute = entityType.getRelationshipAttribute(ANCHOR, relationshipType); + + entityGraphMapper.mapGlossaryRelationshipAttribute(attribute, glossaryObjectId, entityVertex, context); + } + private String createQualifiedName(AtlasVertex vertex) { if (vertex != null) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index d1ccc0c278..01f5e94e87 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -47,10 +47,8 @@ import java.util.List; import static org.apache.atlas.repository.Constants.*; -import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; -import static org.apache.atlas.type.Constants.*; @Component public class TermPreProcessor extends AbstractGlossaryPreProcessor { From 1c595d6202a180b800ccb8b85ff22f9c447e11b4 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Tue, 25 Jul 2023 15:55:46 +0530 Subject: [PATCH 07/10] GOV-967 Fix __parentCategory attr of moved category --- .../graph/v2/preprocessor/glossary/CategoryPreProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index 72bef6081f..559b0de399 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -190,7 +190,7 @@ private void processMoveCategoryToAnotherGlossary(AtlasEntity category, category.setAttribute(QUALIFIED_NAME, updatedQualifiedName); - moveChildrenToAnotherGlossary(categoryVertex, updatedQualifiedName, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); + moveChildrenToAnotherGlossary(categoryVertex, null, sourceGlossaryQualifiedName, targetGlossaryQualifiedName); LOG.info("Moved category {} to Glossary {}", categoryName, targetGlossaryQualifiedName); From e0fb7b0ceb6d95d326b431d48708e28e12413091 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Thu, 10 Aug 2023 20:55:25 +0530 Subject: [PATCH 08/10] GOV-967 Addessed review comments --- .../v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index 333f52c7ae..d99be6ebbc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -143,8 +143,7 @@ public void createAndQueueTask(String taskType, public boolean checkEntityTermAssociation(String termQName) throws AtlasBaseException { List entityHeader; entityHeader = discovery.searchUsingTermQualifiedName(0,1,termQName,null,null); - Boolean hasEntityAssociation = entityHeader != null ? true : false; - return hasEntityAssociation; + return entityHeader != null; } public List indexSearchPaginated(Map dsl) throws AtlasBaseException { From 201a42bcfe7644b27c4aa5a4c62321320f08a6b7 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Fri, 11 Aug 2023 12:30:27 +0530 Subject: [PATCH 09/10] GOV-967 Addressed review comment --- .../glossary/AbstractGlossaryPreProcessor.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index d99be6ebbc..dae44852d7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -109,13 +109,7 @@ public void termExists(String termName, String glossaryQName) throws AtlasBaseEx List terms = indexSearchPaginated(dsl); if (CollectionUtils.isNotEmpty(terms)) { - for (AtlasEntityHeader term : terms) { - String name = (String) term.getAttribute(NAME); - if (termName.equals(name)) { - ret = true; - break; - } - } + ret = terms.stream().map(term -> (String) term.getAttribute(NAME)).anyMatch(name -> termName.equals(name)); } if (ret) { From c9cc6f3ceb37751818d3b9ee1b8e9e20a3e6e637 Mon Sep 17 00:00:00 2001 From: Nikhil P Bonte Date: Fri, 11 Aug 2023 15:56:32 +0530 Subject: [PATCH 10/10] GOV-967 Addressed review comment --- .../atlas/repository/graph/GraphHelper.java | 39 +++++++++++++++++ .../AbstractGlossaryPreProcessor.java | 42 ------------------- .../glossary/CategoryPreProcessor.java | 8 ++-- .../glossary/TermPreProcessor.java | 3 +- 4 files changed, 46 insertions(+), 46 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 5a1905108c..aca49380ef 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -1869,6 +1869,45 @@ public static String getDelimitedClassificationNames(Collection classifi return ret; } + /** + * Get all the active parents + * @param vertex entity vertex + * @param parentEdgeLabel Edge label of parent + * @return Iterator of children vertices + */ + public static Iterator getActiveParentVertices(AtlasVertex vertex, String parentEdgeLabel) throws AtlasBaseException { + return getActiveVertices(vertex, parentEdgeLabel, AtlasEdgeDirection.IN); + } + + /** + * Get all the active children of category + * @param vertex entity vertex + * @param childrenEdgeLabel Edge label of children + * @return Iterator of children vertices + */ + public static Iterator getActiveChildrenVertices(AtlasVertex vertex, String childrenEdgeLabel) throws AtlasBaseException { + return getActiveVertices(vertex, childrenEdgeLabel, AtlasEdgeDirection.OUT); + } + + public static Iterator getActiveVertices(AtlasVertex vertex, String childrenEdgeLabel, AtlasEdgeDirection direction) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("CategoryPreProcessor.getEdges"); + + try { + return vertex.query() + .direction(direction) + .label(childrenEdgeLabel) + .has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE) + .vertices() + .iterator(); + } catch (Exception e) { + LOG.error("Error while getting active children of category for edge label " + childrenEdgeLabel, e); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + private static Set parseLabelsString(String labels) { Set ret = new HashSet<>(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java index dae44852d7..91950f783c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/AbstractGlossaryPreProcessor.java @@ -32,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.tasks.AtlasTask; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; @@ -50,13 +49,11 @@ import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.atlas.repository.Constants.ACTIVE_STATE_VALUE; import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_TERM_ENTITY_TYPE; import static org.apache.atlas.repository.Constants.ELASTICSEARCH_PAGINATION_SIZE; import static org.apache.atlas.repository.Constants.NAME; @@ -227,45 +224,6 @@ public void updateMeaningsAttributesInEntitiesOnTermUpdate(String currentTermNam } } - /** - * Get all the active parents - * @param vertex entity vertex - * @param parentEdgeLabel Edge label of parent - * @return Iterator of children vertices - */ - protected Iterator getActiveParents(AtlasVertex vertex, String parentEdgeLabel) throws AtlasBaseException { - return getEdges(vertex, parentEdgeLabel, AtlasEdgeDirection.IN); - } - - /** - * Get all the active children of category - * @param vertex entity vertex - * @param childrenEdgeLabel Edge label of children - * @return Iterator of children vertices - */ - protected Iterator getActiveChildren(AtlasVertex vertex, String childrenEdgeLabel) throws AtlasBaseException { - return getEdges(vertex, childrenEdgeLabel, AtlasEdgeDirection.OUT); - } - - protected Iterator getEdges(AtlasVertex vertex, String childrenEdgeLabel, AtlasEdgeDirection direction) throws AtlasBaseException { - AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("CategoryPreProcessor.getEdges"); - - try { - return vertex.query() - .direction(direction) - .label(childrenEdgeLabel) - .has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE) - .vertices() - .iterator(); - } catch (Exception e) { - LOG.error("Error while getting active children of category for edge label " + childrenEdgeLabel, e); - throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); - } - finally { - RequestContext.get().endMetricRecord(metricRecorder); - } - } - protected void isAuthorized(AtlasEntityHeader sourceGlossary, AtlasEntityHeader targetGlossary) throws AtlasBaseException { // source -> CREATE + UPDATE + DELETE diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java index 559b0de399..c12f382384 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/CategoryPreProcessor.java @@ -64,6 +64,8 @@ import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.NAME; import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.repository.graph.GraphHelper.getActiveChildrenVertices; +import static org.apache.atlas.repository.graph.GraphHelper.getActiveParentVertices; import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; @@ -229,7 +231,7 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, GraphHelper.setModifiedTime(childCategoryVertex, System.currentTimeMillis()); // move terms to target Glossary - Iterator terms = getActiveChildren(childCategoryVertex, CATEGORY_TERMS_EDGE_LABEL); + Iterator terms = getActiveChildrenVertices(childCategoryVertex, CATEGORY_TERMS_EDGE_LABEL); while (terms.hasNext()) { AtlasVertex termVertex = terms.next(); @@ -237,7 +239,7 @@ private void moveChildrenToAnotherGlossary(AtlasVertex childCategoryVertex, } // Get all children categories of current category - Iterator childCategories = getActiveChildren(childCategoryVertex, CATEGORY_PARENT_EDGE_LABEL); + Iterator childCategories = getActiveChildrenVertices(childCategoryVertex, CATEGORY_PARENT_EDGE_LABEL); while (childCategories.hasNext()) { AtlasVertex childVertex = childCategories.next(); @@ -310,7 +312,7 @@ private void validateParentForGlossaryChange(AtlasEntity category, if (!category.hasRelationshipAttribute(CATEGORY_PARENT)) { // parentCategory not present in payload, check in store - Iterator parentItr = getActiveParents(categoryVertex, CATEGORY_PARENT_EDGE_LABEL); + Iterator parentItr = getActiveParentVertices(categoryVertex, CATEGORY_PARENT_EDGE_LABEL); if (parentItr.hasNext()) { AtlasVertex parentCategory = parentItr.next(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java index 01f5e94e87..53e12ea93e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/glossary/TermPreProcessor.java @@ -47,6 +47,7 @@ import java.util.List; import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.graph.GraphHelper.getActiveParentVertices; import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.*; import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE; @@ -201,7 +202,7 @@ public String moveTermToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex, whether category belongs to target glossary, if not throw an exception */ if (!entity.hasRelationshipAttribute(ATTR_CATEGORIES)) { - Iterator categoriesItr = getActiveParents(vertex, CATEGORY_TERMS_EDGE_LABEL); + Iterator categoriesItr = getActiveParentVertices(vertex, CATEGORY_TERMS_EDGE_LABEL); if (categoriesItr.hasNext()) { AtlasVertex categoryVertex = categoriesItr.next();