Skip to content

Commit

Permalink
Fixes the display name issues for tables, and tags count (#69)
Browse files Browse the repository at this point in the history
* Fixes the names and tags issue for atlas

* Moved the helper functions to atlasclient module

* Updates the atlasclient package to pyatlasclient
  • Loading branch information
verdan authored and feng-tao committed Aug 14, 2019
1 parent 493cb79 commit c79d191
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 91 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include requirements.txt
5 changes: 0 additions & 5 deletions metadata_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ class Config:
# The relationalAttribute name of Atlas Entity that identifies the database entity.
ATLAS_DB_ATTRIBUTE = 'db'

# FixMe: Once GUIDs are in place, then change it to 'name' (display name)
# Display name of Atlas Entities that we use for amundsen project.
# Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute.
ATLAS_NAME_ATTRIBUTE = 'qualifiedName'


class LocalConfig(Config):
DEBUG = False
Expand Down
77 changes: 41 additions & 36 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import parse_table_qualified_name, make_table_qualified_name
from flask import current_app as app

from metadata_service.entity.popular_table import PopularTable
Expand All @@ -18,23 +19,18 @@
LOGGER = logging.getLogger(__name__)


# noinspection PyMethodMayBeStatic
class AtlasProxy(BaseProxy):
"""
Atlas Proxy client for the amundsen metadata
{ATLAS_API_DOCS} = https://atlas.apache.org/api/v2/
"""
TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY']
DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE']
NAME_ATTRIBUTE = app.config['ATLAS_NAME_ATTRIBUTE']
QN_KEY = 'qualifiedName'
ATTRS_KEY = 'attributes'
REL_ATTRS_KEY = 'relationshipAttributes'

# Table Qualified Name Regex
TABLE_QN_REGEX = pattern = re.compile(r"""
^(?P<db_name>.*?)\.(?P<table_name>.*)@(?P<cluster_name>.*?)$
""", re.X)

def __init__(self, *,
host: str,
port: int,
Expand Down Expand Up @@ -77,11 +73,10 @@ def _extract_info_from_uri(self, *, table_uri: str) -> Dict:
Extracts the table information from table_uri coming from frontend.
:param table_uri:
:return: Dictionary object, containing following information:
entity: Database Namespace: rdbms_table, hive_table etc.
entity: Type of entity example: rdbms_table, hive_table etc.
cluster: Cluster information
db: Database Name
name: Unique Table Identifier
name: Table Name
"""
pattern = re.compile(r"""
^ (?P<entity>.*?)
Expand All @@ -104,14 +99,17 @@ def _get_table_entity(self, *, table_uri: str) -> Tuple[EntityUniqueAttribute, D
that can be used for update purposes,
while entity_unique_attribute().entity only returns the dictionary
:param table_uri:
:return:
:return: A tuple of Table entity and parsed information of table qualified name
"""
table_info = self._extract_info_from_uri(table_uri=table_uri)
table_qn = make_table_qualified_name(table_info.get('name'),
table_info.get('cluster'),
table_info.get('db')
)

try:
return self._driver.entity_unique_attribute(
table_info['entity'],
qualifiedName=table_info.get('name')), table_info
table_info['entity'], qualifiedName=table_qn), table_info
except Exception as ex:
LOGGER.exception(f'Table not found. {str(ex)}')
raise NotFoundException('Table URI( {table_uri} ) does not exist'
Expand All @@ -129,7 +127,7 @@ def _get_column(self, *, table_uri: str, column_name: str) -> Dict:
columns = table_entity.entity[self.REL_ATTRS_KEY].get('columns')
for column in columns or list():
col_details = table_entity.referredEntities[column['guid']]
if column_name == col_details[self.ATTRS_KEY][self.NAME_ATTRIBUTE]:
if column_name == col_details[self.ATTRS_KEY]['name']:
return col_details

raise NotFoundException(f'Column not found: {column_name}')
Expand Down Expand Up @@ -166,8 +164,8 @@ def _serialize_columns(self, *, entity: EntityUniqueAttribute) -> \

columns.append(
Column(
name=col_attrs.get(self.NAME_ATTRIBUTE),
description=col_attrs.get('description'),
name=col_attrs.get('name'),
description=col_attrs.get('description') or col_attrs.get('comment'),
col_type=col_attrs.get('type') or col_attrs.get('dataType'),
sort_order=col_attrs.get('position'),
stats=statistics,
Expand All @@ -191,6 +189,10 @@ def get_table(self, *, table_uri: str) -> Table:
try:
attrs = table_details[self.ATTRS_KEY]

table_qn = parse_table_qualified_name(
qualified_name=attrs.get(self.QN_KEY)
)

tags = []
# Using or in case, if the key 'classifications' is there with a None
for classification in table_details.get("classifications") or list():
Expand All @@ -203,15 +205,16 @@ def get_table(self, *, table_uri: str) -> Table:

columns = self._serialize_columns(entity=entity)

table = Table(database=table_info['entity'],
cluster=table_info['cluster'],
schema=table_info['db'],
name=table_info['name'],
tags=tags,
description=attrs.get('description'),
owners=[User(email=attrs.get('owner'))],
columns=columns,
last_updated_timestamp=table_details.get('updateTime'))
table = Table(
database=table_details.get('typeName'),
cluster=table_qn.get('cluster_name', ''),
schema=table_qn.get('db_name', ''),
name=attrs.get('name') or table_qn.get("table_name", ''),
tags=tags,
description=attrs.get('description'),
owners=[User(email=attrs.get('owner'))],
columns=columns,
last_updated_timestamp=table_details.get('updateTime'))

return table
except KeyError as ex:
Expand Down Expand Up @@ -351,19 +354,20 @@ def get_popular_tables(self, *, num_entries: int) -> List[PopularTable]:
table = metadata.relationshipAttributes.get("parentEntity")
table_attrs = table.get(self.ATTRS_KEY)

_regex_result = self.TABLE_QN_REGEX.match(table_attrs.get(self.QN_KEY))
table_qn = _regex_result.groupdict() if _regex_result else dict()
table_qn = parse_table_qualified_name(
qualified_name=table_attrs.get(self.QN_KEY)
)

# Hardcoded empty strings as default, because these values are not optional
table_name = table_attrs.get(self.NAME_ATTRIBUTE) or table_qn.get("table_name", '')
table_name = table_qn.get("table_name") or table_attrs.get('name')
db_name = table_qn.get("db_name", '')
db_cluster = table_qn.get("cluster_name", '')

popular_table = PopularTable(database=table.get("typeName"),
cluster=db_cluster,
schema=db_name,
name=table_name,
description=table_attrs.get('description'))
popular_table = PopularTable(
database=table.get("typeName"),
cluster=db_cluster,
schema=db_name,
name=table_name,
description=table_attrs.get('description'))
popular_tables.append(popular_table)

return popular_tables
Expand All @@ -378,12 +382,13 @@ def get_tags(self) -> List:
:return: A list of TagDetail Objects
"""
tags = []
for type_def in self._driver.typedefs:
for classification in type_def.classificationDefs:
for metrics in self._driver.admin_metrics:
tag_stats = metrics.tag
for tag, count in tag_stats["tagEntities"].items():
tags.append(
TagDetail(
tag_name=classification.name,
tag_count=0 # FixMe (Verdan): Implement the tag count
tag_name=tag,
tag_count=count
)
)
return tags
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ neo4j-driver==1.6.0
neotime==1.0.0
pytz==2018.4
statsd==3.2.1
atlasclient==0.1.7
pyatlasclient==1.0.0
beaker>=1.10.0
31 changes: 7 additions & 24 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os

from setuptools import setup, find_packages

__version__ = '1.0.16'
__version__ = '1.0.17'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
requirements = requirements_file.readlines()

setup(
name='amundsen-metadata',
Expand All @@ -12,27 +17,5 @@
maintainer_email='[email protected]',
packages=find_packages(exclude=['tests*']),
dependency_links=[],
install_requires=[
# Packages in here should rarely be pinned. This is because these
# packages (at the specified version) are required for project
# consuming this library. By pinning to a specific version you are the
# number of projects that can consume this or forcing them to
# upgrade/downgrade any dependencies pinned here in their project.
#

# Generally packages listed here are pinned to a major version range.
#
# e.g.
# Python FooBar package for foobaring
# pyfoobar>=1.0, <2.0
#
# This will allow for any consuming projects to use this library as
# long as they have a version of pyfoobar equal to or greater than 1.x
# and less than 2.x installed.
'Flask-RESTful>=0.3.6',
'neo4j-driver==1.6.0',
'beaker>=1.10.0',
'statsd>=3.2.1',
'atlasclient>=0.1.7'
]
install_requires=requirements,
)
1 change: 1 addition & 0 deletions tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Data:
'guid': 'DOESNT_MATTER',
'typeName': 'COLUMN',
'attributes': {
'name': 'column name',
'qualifiedName': 'column@name',
'type': 'Managed',
'description': 'column description',
Expand Down
57 changes: 32 additions & 25 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import unittest
from atlasclient.exceptions import BadRequest
from mock import patch, MagicMock
Expand Down Expand Up @@ -31,8 +32,9 @@ def __init__(self, dictionary):
return ObjectView(entity)

def _mock_get_table_entity(self, entity=None):
entity = entity or self.entity1
mocked_entity = MagicMock()
mocked_entity.entity = entity or self.entity1
mocked_entity.entity = entity
if mocked_entity.entity == self.entity1:
mocked_entity.referredEntities = {
self.test_column['guid']: self.test_column
Expand All @@ -43,7 +45,7 @@ def _mock_get_table_entity(self, entity=None):
'entity': self.entity_type,
'cluster': self.cluster,
'db': self.db,
'name': self.name
'name': entity['attributes']['name']
}))
return mocked_entity

Expand Down Expand Up @@ -104,15 +106,15 @@ def test_get_table(self):
end_epoch=stats['attributes']['end_epoch'],
)
)
exp_col = Column(name=col_attrs['qualifiedName'],
exp_col = Column(name=col_attrs['name'],
description='column description',
col_type='Managed',
sort_order=col_attrs['position'],
stats=exp_col_stats)
expected = Table(database=self.entity_type,
cluster=self.cluster,
schema=self.db,
name=self.name,
name=ent_attrs['name'],
tags=[Tag(tag_name=classif_name, tag_type="default")],
description=ent_attrs['description'],
owners=[User(email=ent_attrs['owner'])],
Expand All @@ -127,7 +129,7 @@ def test_get_table_not_found(self):

def test_get_table_missing_info(self):
with self.assertRaises(BadRequest):
local_entity = self.entity1.copy()
local_entity = copy.deepcopy(self.entity1)
local_entity.pop('attributes')
unique_attr_response = MagicMock()
unique_attr_response.entity = local_entity
Expand All @@ -150,19 +152,21 @@ def test_get_popular_tables(self):

expected = [
PopularTable(database=self.entity_type, cluster=self.cluster, schema=self.db,
name=ent1_attrs['qualifiedName'], description=ent1_attrs['description']),
name=ent1_attrs['name'], description=ent1_attrs['description']),
PopularTable(database=self.entity_type, cluster=self.cluster, schema=self.db,
name=ent2_attrs['qualifiedName'], description=ent1_attrs['description']),
name=ent2_attrs['name'], description=ent1_attrs['description']),
]

self.assertEqual(expected.__repr__(), response.__repr__())

# noinspection PyTypeChecker
def test_get_popular_tables_without_db(self):
meta1 = self.metadata1.copy()
meta2 = self.metadata2.copy()
meta1 = copy.deepcopy(self.metadata1)
meta2 = copy.deepcopy(self.metadata2)

for meta in [meta1, meta2]:
meta['relationshipAttributes']['parentEntity']['attributes']['qualifiedName'] = 'meta@cluster'
meta['relationshipAttributes']['parentEntity']['attributes']['qualifiedName'] = \
meta['relationshipAttributes']['parentEntity']['attributes']['name']

metadata1 = self.to_class(meta1)
metadata2 = self.to_class(meta2)
Expand All @@ -177,10 +181,10 @@ def test_get_popular_tables_without_db(self):
ent2_attrs = self.entity2['attributes']

expected = [
PopularTable(database=self.entity_type, cluster='', schema='',
name=ent1_attrs['qualifiedName'], description=ent1_attrs['description']),
PopularTable(database=self.entity_type, cluster='', schema='',
name=ent2_attrs['qualifiedName'], description=ent1_attrs['description']),
PopularTable(database=self.entity_type, cluster='default', schema='default',
name=ent1_attrs['name'], description=ent1_attrs['description']),
PopularTable(database=self.entity_type, cluster='default', schema='default',
name=ent2_attrs['name'], description=ent1_attrs['description']),
]

self.assertEqual(expected.__repr__(), response.__repr__())
Expand All @@ -203,19 +207,22 @@ def test_put_table_description(self):
description="DOESNT_MATTER")

def test_get_tags(self):
name = "DUMMY_CLASSIFICATION"
mocked_classif = MagicMock()
mocked_classif.name = name
tag_response = {
'tagEntities': {
'PII': 3,
'NON_PII': 2
}
}

mocked_def = MagicMock()
mocked_def.classificationDefs = [mocked_classif]
mocked_metrics = MagicMock()
mocked_metrics.tag = tag_response

self.proxy._driver.typedefs = [mocked_def]
self.proxy._driver.admin_metrics = [mocked_metrics]

response = self.proxy.get_tags()

expected = [TagDetail(tag_name=name, tag_count=0)]
self.assertEqual(response.__repr__(), expected.__repr__())
expected = [TagDetail(tag_name='PII', tag_count=3), TagDetail(tag_name='NON_PII', tag_count=2)]
self.assertEqual(expected.__repr__(), response.__repr__())

def test_add_tag(self):
tag = "TAG"
Expand Down Expand Up @@ -248,7 +255,7 @@ def test_get_column(self):
self._mock_get_table_entity()
response = self.proxy._get_column(
table_uri=self.table_uri,
column_name=self.test_column['attributes']['qualifiedName'])
column_name=self.test_column['attributes']['name'])
self.assertDictEqual(response, self.test_column)

def test_get_column_wrong_name(self):
Expand All @@ -267,13 +274,13 @@ def test_get_column_description(self):
self._mock_get_table_entity()
response = self.proxy.get_column_description(
table_uri=self.table_uri,
column_name=self.test_column['attributes']['qualifiedName'])
column_name=self.test_column['attributes']['name'])
self.assertEqual(response, self.test_column['attributes'].get('description'))

def test_put_column_description(self):
self._mock_get_table_entity()
self.proxy.put_column_description(table_uri=self.table_uri,
column_name=self.test_column['attributes']['qualifiedName'],
column_name=self.test_column['attributes']['name'],
description='DOESNT_MATTER')


Expand Down

0 comments on commit c79d191

Please sign in to comment.