diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..f9bd1455 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include requirements.txt diff --git a/metadata_service/config.py b/metadata_service/config.py index a7755b10..2dc19679 100644 --- a/metadata_service/config.py +++ b/metadata_service/config.py @@ -34,11 +34,6 @@ class Config: # The relationalAttribute name of Atlas Entity that identifies the database entity. ATLAS_DB_ATTRIBUTE = 'db' - # FixMe: Once GUIDs are in place, then change it to 'name' (display name) - # Display name of Atlas Entities that we use for amundsen project. - # Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute. - ATLAS_NAME_ATTRIBUTE = 'qualifiedName' - class LocalConfig(Config): DEBUG = False diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index 15ec119a..a2d7baf4 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -5,6 +5,7 @@ from atlasclient.client import Atlas from atlasclient.exceptions import BadRequest from atlasclient.models import EntityUniqueAttribute +from atlasclient.utils import parse_table_qualified_name, make_table_qualified_name from flask import current_app as app from metadata_service.entity.popular_table import PopularTable @@ -18,6 +19,7 @@ LOGGER = logging.getLogger(__name__) +# noinspection PyMethodMayBeStatic class AtlasProxy(BaseProxy): """ Atlas Proxy client for the amundsen metadata @@ -25,16 +27,10 @@ class AtlasProxy(BaseProxy): """ TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY'] DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE'] - NAME_ATTRIBUTE = app.config['ATLAS_NAME_ATTRIBUTE'] QN_KEY = 'qualifiedName' ATTRS_KEY = 'attributes' REL_ATTRS_KEY = 'relationshipAttributes' - # Table Qualified Name Regex - TABLE_QN_REGEX = pattern = re.compile(r""" - ^(?P.*?)\.(?P.*)@(?P.*?)$ - """, re.X) - def __init__(self, *, host: str, port: int, @@ -77,11 +73,10 @@ def _extract_info_from_uri(self, *, table_uri: str) -> Dict: Extracts the table information from table_uri coming from frontend. :param table_uri: :return: Dictionary object, containing following information: - entity: Database Namespace: rdbms_table, hive_table etc. entity: Type of entity example: rdbms_table, hive_table etc. cluster: Cluster information db: Database Name - name: Unique Table Identifier + name: Table Name """ pattern = re.compile(r""" ^ (?P.*?) @@ -104,14 +99,17 @@ def _get_table_entity(self, *, table_uri: str) -> Tuple[EntityUniqueAttribute, D that can be used for update purposes, while entity_unique_attribute().entity only returns the dictionary :param table_uri: - :return: + :return: A tuple of Table entity and parsed information of table qualified name """ table_info = self._extract_info_from_uri(table_uri=table_uri) + table_qn = make_table_qualified_name(table_info.get('name'), + table_info.get('cluster'), + table_info.get('db') + ) try: return self._driver.entity_unique_attribute( - table_info['entity'], - qualifiedName=table_info.get('name')), table_info + table_info['entity'], qualifiedName=table_qn), table_info except Exception as ex: LOGGER.exception(f'Table not found. {str(ex)}') raise NotFoundException('Table URI( {table_uri} ) does not exist' @@ -129,7 +127,7 @@ def _get_column(self, *, table_uri: str, column_name: str) -> Dict: columns = table_entity.entity[self.REL_ATTRS_KEY].get('columns') for column in columns or list(): col_details = table_entity.referredEntities[column['guid']] - if column_name == col_details[self.ATTRS_KEY][self.NAME_ATTRIBUTE]: + if column_name == col_details[self.ATTRS_KEY]['name']: return col_details raise NotFoundException(f'Column not found: {column_name}') @@ -166,8 +164,8 @@ def _serialize_columns(self, *, entity: EntityUniqueAttribute) -> \ columns.append( Column( - name=col_attrs.get(self.NAME_ATTRIBUTE), - description=col_attrs.get('description'), + name=col_attrs.get('name'), + description=col_attrs.get('description') or col_attrs.get('comment'), col_type=col_attrs.get('type') or col_attrs.get('dataType'), sort_order=col_attrs.get('position'), stats=statistics, @@ -191,6 +189,10 @@ def get_table(self, *, table_uri: str) -> Table: try: attrs = table_details[self.ATTRS_KEY] + table_qn = parse_table_qualified_name( + qualified_name=attrs.get(self.QN_KEY) + ) + tags = [] # Using or in case, if the key 'classifications' is there with a None for classification in table_details.get("classifications") or list(): @@ -203,15 +205,16 @@ def get_table(self, *, table_uri: str) -> Table: columns = self._serialize_columns(entity=entity) - table = Table(database=table_info['entity'], - cluster=table_info['cluster'], - schema=table_info['db'], - name=table_info['name'], - tags=tags, - description=attrs.get('description'), - owners=[User(email=attrs.get('owner'))], - columns=columns, - last_updated_timestamp=table_details.get('updateTime')) + table = Table( + database=table_details.get('typeName'), + cluster=table_qn.get('cluster_name', ''), + schema=table_qn.get('db_name', ''), + name=attrs.get('name') or table_qn.get("table_name", ''), + tags=tags, + description=attrs.get('description'), + owners=[User(email=attrs.get('owner'))], + columns=columns, + last_updated_timestamp=table_details.get('updateTime')) return table except KeyError as ex: @@ -351,19 +354,20 @@ def get_popular_tables(self, *, num_entries: int) -> List[PopularTable]: table = metadata.relationshipAttributes.get("parentEntity") table_attrs = table.get(self.ATTRS_KEY) - _regex_result = self.TABLE_QN_REGEX.match(table_attrs.get(self.QN_KEY)) - table_qn = _regex_result.groupdict() if _regex_result else dict() + table_qn = parse_table_qualified_name( + qualified_name=table_attrs.get(self.QN_KEY) + ) - # Hardcoded empty strings as default, because these values are not optional - table_name = table_attrs.get(self.NAME_ATTRIBUTE) or table_qn.get("table_name", '') + table_name = table_qn.get("table_name") or table_attrs.get('name') db_name = table_qn.get("db_name", '') db_cluster = table_qn.get("cluster_name", '') - popular_table = PopularTable(database=table.get("typeName"), - cluster=db_cluster, - schema=db_name, - name=table_name, - description=table_attrs.get('description')) + popular_table = PopularTable( + database=table.get("typeName"), + cluster=db_cluster, + schema=db_name, + name=table_name, + description=table_attrs.get('description')) popular_tables.append(popular_table) return popular_tables @@ -378,12 +382,13 @@ def get_tags(self) -> List: :return: A list of TagDetail Objects """ tags = [] - for type_def in self._driver.typedefs: - for classification in type_def.classificationDefs: + for metrics in self._driver.admin_metrics: + tag_stats = metrics.tag + for tag, count in tag_stats["tagEntities"].items(): tags.append( TagDetail( - tag_name=classification.name, - tag_count=0 # FixMe (Verdan): Implement the tag count + tag_name=tag, + tag_count=count ) ) return tags diff --git a/requirements.txt b/requirements.txt index f498a693..e288025b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -59,4 +59,5 @@ neo4j-driver==1.6.0 neotime==1.0.0 pytz==2018.4 statsd==3.2.1 -atlasclient==0.1.7 +pyatlasclient==1.0.0 +beaker>=1.10.0 diff --git a/setup.py b/setup.py index 0e89cd6a..1b4cf15b 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,12 @@ +import os + from setuptools import setup, find_packages -__version__ = '1.0.16' +__version__ = '1.0.17' +requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') +with open(requirements_path) as requirements_file: + requirements = requirements_file.readlines() setup( name='amundsen-metadata', @@ -12,27 +17,5 @@ maintainer_email='dev@lyft.com', packages=find_packages(exclude=['tests*']), dependency_links=[], - install_requires=[ - # Packages in here should rarely be pinned. This is because these - # packages (at the specified version) are required for project - # consuming this library. By pinning to a specific version you are the - # number of projects that can consume this or forcing them to - # upgrade/downgrade any dependencies pinned here in their project. - # - - # Generally packages listed here are pinned to a major version range. - # - # e.g. - # Python FooBar package for foobaring - # pyfoobar>=1.0, <2.0 - # - # This will allow for any consuming projects to use this library as - # long as they have a version of pyfoobar equal to or greater than 1.x - # and less than 2.x installed. - 'Flask-RESTful>=0.3.6', - 'neo4j-driver==1.6.0', - 'beaker>=1.10.0', - 'statsd>=3.2.1', - 'atlasclient>=0.1.7' - ] + install_requires=requirements, ) diff --git a/tests/unit/proxy/fixtures/atlas_test_data.py b/tests/unit/proxy/fixtures/atlas_test_data.py index d7c71702..3e74decd 100644 --- a/tests/unit/proxy/fixtures/atlas_test_data.py +++ b/tests/unit/proxy/fixtures/atlas_test_data.py @@ -16,6 +16,7 @@ class Data: 'guid': 'DOESNT_MATTER', 'typeName': 'COLUMN', 'attributes': { + 'name': 'column name', 'qualifiedName': 'column@name', 'type': 'Managed', 'description': 'column description', diff --git a/tests/unit/proxy/test_atlas_proxy.py b/tests/unit/proxy/test_atlas_proxy.py index 6b1d22c2..58a992a5 100644 --- a/tests/unit/proxy/test_atlas_proxy.py +++ b/tests/unit/proxy/test_atlas_proxy.py @@ -1,3 +1,4 @@ +import copy import unittest from atlasclient.exceptions import BadRequest from mock import patch, MagicMock @@ -31,8 +32,9 @@ def __init__(self, dictionary): return ObjectView(entity) def _mock_get_table_entity(self, entity=None): + entity = entity or self.entity1 mocked_entity = MagicMock() - mocked_entity.entity = entity or self.entity1 + mocked_entity.entity = entity if mocked_entity.entity == self.entity1: mocked_entity.referredEntities = { self.test_column['guid']: self.test_column @@ -43,7 +45,7 @@ def _mock_get_table_entity(self, entity=None): 'entity': self.entity_type, 'cluster': self.cluster, 'db': self.db, - 'name': self.name + 'name': entity['attributes']['name'] })) return mocked_entity @@ -104,7 +106,7 @@ def test_get_table(self): end_epoch=stats['attributes']['end_epoch'], ) ) - exp_col = Column(name=col_attrs['qualifiedName'], + exp_col = Column(name=col_attrs['name'], description='column description', col_type='Managed', sort_order=col_attrs['position'], @@ -112,7 +114,7 @@ def test_get_table(self): expected = Table(database=self.entity_type, cluster=self.cluster, schema=self.db, - name=self.name, + name=ent_attrs['name'], tags=[Tag(tag_name=classif_name, tag_type="default")], description=ent_attrs['description'], owners=[User(email=ent_attrs['owner'])], @@ -127,7 +129,7 @@ def test_get_table_not_found(self): def test_get_table_missing_info(self): with self.assertRaises(BadRequest): - local_entity = self.entity1.copy() + local_entity = copy.deepcopy(self.entity1) local_entity.pop('attributes') unique_attr_response = MagicMock() unique_attr_response.entity = local_entity @@ -150,19 +152,21 @@ def test_get_popular_tables(self): expected = [ PopularTable(database=self.entity_type, cluster=self.cluster, schema=self.db, - name=ent1_attrs['qualifiedName'], description=ent1_attrs['description']), + name=ent1_attrs['name'], description=ent1_attrs['description']), PopularTable(database=self.entity_type, cluster=self.cluster, schema=self.db, - name=ent2_attrs['qualifiedName'], description=ent1_attrs['description']), + name=ent2_attrs['name'], description=ent1_attrs['description']), ] self.assertEqual(expected.__repr__(), response.__repr__()) + # noinspection PyTypeChecker def test_get_popular_tables_without_db(self): - meta1 = self.metadata1.copy() - meta2 = self.metadata2.copy() + meta1 = copy.deepcopy(self.metadata1) + meta2 = copy.deepcopy(self.metadata2) for meta in [meta1, meta2]: - meta['relationshipAttributes']['parentEntity']['attributes']['qualifiedName'] = 'meta@cluster' + meta['relationshipAttributes']['parentEntity']['attributes']['qualifiedName'] = \ + meta['relationshipAttributes']['parentEntity']['attributes']['name'] metadata1 = self.to_class(meta1) metadata2 = self.to_class(meta2) @@ -177,10 +181,10 @@ def test_get_popular_tables_without_db(self): ent2_attrs = self.entity2['attributes'] expected = [ - PopularTable(database=self.entity_type, cluster='', schema='', - name=ent1_attrs['qualifiedName'], description=ent1_attrs['description']), - PopularTable(database=self.entity_type, cluster='', schema='', - name=ent2_attrs['qualifiedName'], description=ent1_attrs['description']), + PopularTable(database=self.entity_type, cluster='default', schema='default', + name=ent1_attrs['name'], description=ent1_attrs['description']), + PopularTable(database=self.entity_type, cluster='default', schema='default', + name=ent2_attrs['name'], description=ent1_attrs['description']), ] self.assertEqual(expected.__repr__(), response.__repr__()) @@ -203,19 +207,22 @@ def test_put_table_description(self): description="DOESNT_MATTER") def test_get_tags(self): - name = "DUMMY_CLASSIFICATION" - mocked_classif = MagicMock() - mocked_classif.name = name + tag_response = { + 'tagEntities': { + 'PII': 3, + 'NON_PII': 2 + } + } - mocked_def = MagicMock() - mocked_def.classificationDefs = [mocked_classif] + mocked_metrics = MagicMock() + mocked_metrics.tag = tag_response - self.proxy._driver.typedefs = [mocked_def] + self.proxy._driver.admin_metrics = [mocked_metrics] response = self.proxy.get_tags() - expected = [TagDetail(tag_name=name, tag_count=0)] - self.assertEqual(response.__repr__(), expected.__repr__()) + expected = [TagDetail(tag_name='PII', tag_count=3), TagDetail(tag_name='NON_PII', tag_count=2)] + self.assertEqual(expected.__repr__(), response.__repr__()) def test_add_tag(self): tag = "TAG" @@ -248,7 +255,7 @@ def test_get_column(self): self._mock_get_table_entity() response = self.proxy._get_column( table_uri=self.table_uri, - column_name=self.test_column['attributes']['qualifiedName']) + column_name=self.test_column['attributes']['name']) self.assertDictEqual(response, self.test_column) def test_get_column_wrong_name(self): @@ -267,13 +274,13 @@ def test_get_column_description(self): self._mock_get_table_entity() response = self.proxy.get_column_description( table_uri=self.table_uri, - column_name=self.test_column['attributes']['qualifiedName']) + column_name=self.test_column['attributes']['name']) self.assertEqual(response, self.test_column['attributes'].get('description')) def test_put_column_description(self): self._mock_get_table_entity() self.proxy.put_column_description(table_uri=self.table_uri, - column_name=self.test_column['attributes']['qualifiedName'], + column_name=self.test_column['attributes']['name'], description='DOESNT_MATTER')