Skip to content

Commit

Permalink
Merge pull request #2452 from atlanhq/documenttype
Browse files Browse the repository at this point in the history
Add Document type under Term
  • Loading branch information
nikhilbonte21 authored Nov 8, 2023
2 parents e207622 + f32d7b3 commit 5797da5
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public final class Constants {
public static final String ATLAS_GLOSSARY_ENTITY_TYPE = "AtlasGlossary";
public static final String ATLAS_GLOSSARY_TERM_ENTITY_TYPE = "AtlasGlossaryTerm";
public static final String ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE = "AtlasGlossaryCategory";
public static final String DOCUMENT_ENTITY_TYPE = "Document";
public static final String CATEGORY_PARENT_EDGE_LABEL = "r:AtlasGlossaryCategoryHierarchyLink";
public static final String CATEGORY_TERMS_EDGE_LABEL = "r:AtlasGlossaryTermCategorization";
public static final String GLOSSARY_TERMS_EDGE_LABEL = "r:AtlasGlossaryTermAnchor";
Expand Down
2 changes: 1 addition & 1 deletion intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public enum AtlasErrorCode {
TYPE_HAS_RELATIONSHIPS(409, "ATLAS-409-00-005", "Given type {0} has associated relationshipDefs"),
SAVED_SEARCH_ALREADY_EXISTS(409, "ATLAS-409-00-006", "search named {0} already exists for user {1}"),
GLOSSARY_ALREADY_EXISTS(409, "ATLAS-409-00-007", "Glossary with name {0} already exists"),
GLOSSARY_TERM_ALREADY_EXISTS(409, "ATLAS-409-00-009", "Glossary term with name {0} already exists"),
GLOSSARY_TERM_ALREADY_EXISTS(409, "ATLAS-409-00-009", "Glossary {0} with name {1} already exists"),
GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with name {0} already exists on this level"),
ACHOR_UPDATION_NOT_SUPPORTED(409, "ATLAS-400-00-0010", "Anchor(glossary) change not supported"),
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.atlas.repository.store.graph.v1.RestoreHandlerV1;
import org.apache.atlas.repository.store.graph.v2.preprocessor.AuthPolicyPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.ConnectionPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.DocumentPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.accesscontrol.PersonaPreProcessor;
Expand Down Expand Up @@ -1796,6 +1797,10 @@ public PreProcessor getPreProcessor(String typeName) {
preProcessor = new TermPreProcessor(typeRegistry, entityRetriever, graph, taskManagement);
break;

case DOCUMENT_ENTITY_TYPE:
preProcessor = new DocumentPreProcessor(typeRegistry, entityRetriever, graph, taskManagement);
break;

case ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE:
preProcessor = new CategoryPreProcessor(typeRegistry, entityRetriever, graph, taskManagement, entityGraphMapper);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ private void addGlossaryAttr(AttributeMutationContext ctx, AtlasEdge edge) {
AtlasVertex toVertex = ctx.getReferringVertex();
String toVertexType = getTypeName(toVertex);

if (TYPE_TERM.equals(toVertexType) || TYPE_CATEGORY.equals(toVertexType)) {
if (TYPE_TERM.equals(toVertexType) || DOCUMENT_ENTITY_TYPE.equals(toVertexType) || TYPE_CATEGORY.equals(toVertexType)) {
// handle __glossary attribute of term or category entity
String gloQname = edge.getOutVertex().getProperty(QUALIFIED_NAME, String.class);
AtlasGraphUtilsV2.setEncodedProperty(toVertex, GLOSSARY_PROPERTY_KEY, gloQname);
Expand Down Expand Up @@ -2064,7 +2064,7 @@ private void addCategoriesToTermEntity(AttributeMutationContext ctx, List<Object
}
}

if (TYPE_TERM.equals(getTypeName(termVertex))) {
if (TYPE_TERM.equals(getTypeName(termVertex)) || DOCUMENT_ENTITY_TYPE.equals(getTypeName(termVertex))) {
List<AtlasVertex> categoryVertices = newElementsCreated.stream().map(x -> ((AtlasEdge)x).getOutVertex()).collect(Collectors.toList());
Set<String> catQnames = categoryVertices.stream().map(x -> x.getProperty(QUALIFIED_NAME, String.class)).collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ public abstract class AbstractGlossaryPreProcessor implements PreProcessor {
}
}

public void termExists(String termName, String glossaryQName) throws AtlasBaseException {
public void termExists(String termName, String glossaryQName, String entityType) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("termExists");
boolean ret = false;

try {
List mustClauseList = new ArrayList();
mustClauseList.add(mapOf("term", mapOf("__glossary", glossaryQName)));
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", ATLAS_GLOSSARY_TERM_ENTITY_TYPE)));
mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", entityType)));
mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE")));
mustClauseList.add(mapOf("term", mapOf("name.keyword", termName)));

Expand All @@ -110,7 +110,8 @@ public void termExists(String termName, String glossaryQName) throws AtlasBaseEx
}

if (ret) {
throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, termName);
String type = entityType.equals(ATLAS_GLOSSARY_TERM_ENTITY_TYPE) ? "term" : "document";
throw new AtlasBaseException(AtlasErrorCode.GLOSSARY_TERM_ALREADY_EXISTS, type, termName);
}
} finally {
RequestContext.get().endMetricRecord(metricRecorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.atlas.AtlasErrorCode.BAD_REQUEST;
import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_CATEGORY_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_TERM_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.CATEGORY_PARENT_EDGE_LABEL;
import static org.apache.atlas.repository.Constants.CATEGORY_TERMS_EDGE_LABEL;
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
Expand Down Expand Up @@ -272,7 +273,7 @@ public void moveChildTermToAnotherGlossary(AtlasVertex termVertex,
LOG.info("Moving child term {} to Glossary {}", termName, targetGlossaryQualifiedName);

//check duplicate term name
termExists(termName, targetGlossaryQualifiedName);
termExists(termName, targetGlossaryQualifiedName, ATLAS_GLOSSARY_TERM_ENTITY_TYPE);

String currentTermQualifiedName = termVertex.getProperty(QUALIFIED_NAME, String.class);
String updatedTermQualifiedName = currentTermQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.preprocessor.glossary;


import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityMutationContext;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Iterator;
import java.util.List;

import static org.apache.atlas.repository.Constants.CATEGORY_TERMS_EDGE_LABEL;
import static org.apache.atlas.repository.Constants.DOCUMENT_ENTITY_TYPE;
import static org.apache.atlas.repository.Constants.NAME;
import static org.apache.atlas.repository.Constants.QUALIFIED_NAME;
import static org.apache.atlas.repository.graph.GraphHelper.getActiveParentVertices;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.ANCHOR;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.getUUID;
import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid;
import static org.apache.atlas.repository.store.graph.v2.tasks.MeaningsTaskFactory.UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE;

@Component
public class DocumentPreProcessor extends AbstractGlossaryPreProcessor {
private static final Logger LOG = LoggerFactory.getLogger(DocumentPreProcessor.class);

private AtlasEntityHeader anchor;
public DocumentPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, TaskManagement taskManagement) {
super(typeRegistry, entityRetriever, graph, taskManagement);
}

@Override
public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, EntityMutations.EntityOperation operation) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("DocumentPreProcessor.processAttributes: pre processing {}, {}",
entityStruct.getAttribute(QUALIFIED_NAME), operation);
}

AtlasEntity entity = (AtlasEntity) entityStruct;
AtlasVertex vertex = context.getVertex(entity.getGuid());

setAnchor(entity, context);

switch (operation) {
case CREATE:
processCreateDocument(entity, vertex);
break;
case UPDATE:
processUpdateDocument(entity, vertex);
break;
}
}

private void processCreateDocument(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processCreateDocument");
String docName = (String) entity.getAttribute(NAME);

if (StringUtils.isEmpty(docName) || isNameInvalid(docName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
}

String glossaryQName = (String) anchor.getAttribute(QUALIFIED_NAME);

termExists(docName, glossaryQName, DOCUMENT_ENTITY_TYPE);

validateCategory(entity);

entity.setAttribute(QUALIFIED_NAME, createQualifiedName());
AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)),
"create entity: type=", entity.getTypeName());

RequestContext.get().endMetricRecord(metricRecorder);
}

private void processUpdateDocument(AtlasEntity entity, AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processUpdateDocument");
String docName = (String) entity.getAttribute(NAME);
String vertexName = vertex.getProperty(NAME, String.class);
String docGuid = entity.getGuid();

if (StringUtils.isEmpty(docName) || isNameInvalid(docName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME);
}

validateCategory(entity);

AtlasEntity storedDoc = entityRetriever.toAtlasEntity(vertex);
AtlasRelatedObjectId currentGlossary = (AtlasRelatedObjectId) storedDoc.getRelationshipAttribute(ANCHOR);
AtlasEntityHeader currentGlossaryHeader = entityRetriever.toAtlasEntityHeader(currentGlossary.getGuid());
String currentGlossaryQualifiedName = (String) currentGlossaryHeader.getAttribute(QUALIFIED_NAME);

String docQualifiedName = vertex.getProperty(QUALIFIED_NAME, String.class);

String newGlossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME);

if (!currentGlossaryQualifiedName.equals(newGlossaryQualifiedName)){
//Auth check
isAuthorized(currentGlossaryHeader, anchor);

String updatedDocQualifiedName = moveDocToAnotherGlossary(entity, vertex, currentGlossaryQualifiedName, newGlossaryQualifiedName, docQualifiedName);

if (checkEntityTermAssociation(docQualifiedName)) {
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, vertexName, docName, docQualifiedName, updatedDocQualifiedName, vertex);
} else {
updateMeaningsAttributesInEntitiesOnTermUpdate(vertexName, docName, docQualifiedName, updatedDocQualifiedName, docGuid);
}
}

} else {

if (!vertexName.equals(docName)) {
termExists(docName, newGlossaryQualifiedName, DOCUMENT_ENTITY_TYPE);
}

entity.setAttribute(QUALIFIED_NAME, docQualifiedName);

if (!docName.equals(vertexName) && checkEntityTermAssociation(docQualifiedName)) {
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
createAndQueueTask(UPDATE_ENTITY_MEANINGS_ON_TERM_UPDATE, vertexName, docName, docQualifiedName, null, vertex);
} else {
updateMeaningsAttributesInEntitiesOnTermUpdate(vertexName, docName, docQualifiedName, null, docGuid);
}
}
}

RequestContext.get().endMetricRecord(metricRecorder);
}

private void validateCategory(AtlasEntity entity) throws AtlasBaseException {
String glossaryQualifiedName = (String) anchor.getAttribute(QUALIFIED_NAME);

if (entity.hasRelationshipAttribute(ATTR_CATEGORIES) && entity.getRelationshipAttribute(ATTR_CATEGORIES) != null) {
List<AtlasObjectId> categories = (List<AtlasObjectId>) entity.getRelationshipAttribute(ATTR_CATEGORIES);

if (CollectionUtils.isNotEmpty(categories)) {
AtlasObjectId category = categories.get(0);
String categoryQualifiedName;

if (category.getUniqueAttributes() != null && category.getUniqueAttributes().containsKey(QUALIFIED_NAME)) {
categoryQualifiedName = (String) category.getUniqueAttributes().get(QUALIFIED_NAME);
} else {
AtlasVertex categoryVertex = entityRetriever.getEntityVertex(category.getGuid());
categoryQualifiedName = categoryVertex.getProperty(QUALIFIED_NAME, String.class);
}

if (!categoryQualifiedName.endsWith(glossaryQualifiedName)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Passed category doesn't belongs to Passed Glossary");
}
}
}
}

public String moveDocToAnotherGlossary(AtlasEntity entity, AtlasVertex vertex,
String sourceGlossaryQualifiedName,
String targetGlossaryQualifiedName,
String currentDocQualifiedName) throws AtlasBaseException {

//check duplicate doc name
termExists((String) entity.getAttribute(NAME), targetGlossaryQualifiedName, DOCUMENT_ENTITY_TYPE);


String updatedQualifiedName = currentDocQualifiedName.replace(sourceGlossaryQualifiedName, targetGlossaryQualifiedName);

//qualifiedName
entity.setAttribute(QUALIFIED_NAME, updatedQualifiedName);

// __categories
/* if category is not passed in relationshipAttributes, check
whether category belongs to target glossary, if not throw an exception
*/
if (!entity.hasRelationshipAttribute(ATTR_CATEGORIES)) {
Iterator<AtlasVertex> categoriesItr = getActiveParentVertices(vertex, CATEGORY_TERMS_EDGE_LABEL);

if (categoriesItr.hasNext()) {
AtlasVertex categoryVertex = categoriesItr.next();

String categoryQualifiedName = categoryVertex.getProperty(QUALIFIED_NAME, String.class);

if (!categoryQualifiedName.endsWith(targetGlossaryQualifiedName)) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Passed category doesn't belongs to Passed Glossary");
}
}
}

return updatedQualifiedName;
}

private String createQualifiedName() {
return getUUID() + "@" + anchor.getAttribute(QUALIFIED_NAME);
}

private void setAnchor(AtlasEntity entity, EntityMutationContext context) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("TermPreProcessor.setAnchor");
if (anchor == null) {
AtlasObjectId objectId = (AtlasObjectId) entity.getRelationshipAttribute(ANCHOR);

if (StringUtils.isNotEmpty(objectId.getGuid())) {
AtlasVertex vertex = context.getVertex(objectId.getGuid());

if (vertex == null) {
anchor = entityRetriever.toAtlasEntityHeader(objectId.getGuid());
} else {
anchor = entityRetriever.toAtlasEntityHeader(vertex);
}

} else if (MapUtils.isNotEmpty(objectId.getUniqueAttributes()) &&
StringUtils.isNotEmpty( (String) objectId.getUniqueAttributes().get(QUALIFIED_NAME))) {
anchor = new AtlasEntityHeader(objectId.getTypeName(), objectId.getUniqueAttributes());

}
}
RequestContext.get().endMetricRecord(metricRecorder);
}
}
Loading

0 comments on commit 5797da5

Please sign in to comment.