From d81bc7dbd01a40994a8778a18958e8836cb042d5 Mon Sep 17 00:00:00 2001 From: Nanne Wielinga Date: Thu, 2 May 2019 14:06:11 +0200 Subject: [PATCH] Add support for Apache Atlas DSL search We add another implementation of the base proxy with the Apache Atlas as backend. In the document we describe the different ways to integrate Apache Atlas search into Amundsen. --- README.md | 4 + docs/atlas-search.md | 35 ++++ requirements.txt | 1 + search_service/config.py | 15 ++ search_service/proxy/atlas.py | 230 ++++++++++++++++++++++++++ search_service/proxy/base.py | 2 - tests/unit/proxy/test_atlas.py | 285 +++++++++++++++++++++++++++++++++ 7 files changed, 570 insertions(+), 2 deletions(-) create mode 100644 docs/atlas-search.md create mode 100644 search_service/proxy/atlas.py create mode 100644 tests/unit/proxy/test_atlas.py diff --git a/README.md b/README.md index f2484638..ae4f6275 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,10 @@ Proxy package contains proxy modules that talks dependencies of Search service. ##### [Elasticsearch proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/elasticsearch.py "Elasticsearch proxy module") [Elasticsearch](https://www.elastic.co/products/elasticsearch "Elasticsearch") proxy module serves various use case of searching metadata from Elasticsearch. It uses [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html "Query DSL") for the use case, execute the search query and transform into [model](https://github.com/lyft/amundsensearchlibrary/tree/master/search_service/models "model"). +##### [Atlas proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/atlas.py "Atlas proxy module") +[Apache Atlas](https://atlas.apache.org/ "Apache Atlas") proxy module uses Atlas to serve the Atlas requests. At the moment the search DSL REST api is used via the [Python Client](https://atlasclient.readthedocs.io/ "Atlas Client"). + + ##### [Statsd utilities module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/statsd_utilities.py "Statsd utilities module") [Statsd](https://github.com/etsy/statsd/wiki "Statsd") utilities module has methods / functions to support statsd to publish metrics. By default, statsd integration is disabled and you can turn in on from [Search service configuration](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/config.py#L7 "Search service configuration"). For specific configuration related to statsd, you can configure it through [environment variable.](https://statsd.readthedocs.io/en/latest/configure.html#from-the-environment "environment variable.") diff --git a/docs/atlas-search.md b/docs/atlas-search.md new file mode 100644 index 00000000..6f89d8dc --- /dev/null +++ b/docs/atlas-search.md @@ -0,0 +1,35 @@ +# Atlas search investigation +There are several approaches to integrate searching within [Apache Atlas](https://atlas.apache.org/ "Apache Atlas"), we describe multiple options below: + +- Use REST API's + +Directly using the Atlas API's is quick to implement and easy to setup for administrators. Atlas uses a search engine +underwater (embedded Solr) to perform search queries, thus in theory this method should scale up. Disadvantages are that +we are limited to the REST api that Atlas offers, we could potentially add functionality via pull requests and extend +the search capabilities. The [advanced search](https://atlas.apache.org/Search-Advanced.html "Apache Atlas Advanced Search") +provides a DSL which contains basic forms of aggregation and arithmetic. + +- Use Data Builder to fill Elasticsearch from Atlas + +Adopting Atlas within the Data Builder to fill Elasticsearch is a relatively straightforward way of staying +compatible with the Neo4j database. It could either be pulling data from Atlas or being pushed by Kafka. This method +requires a setup of Elasticsearch and Airflow, which increases the amount of infrastructure and maintenance. +Another disadvantage is that with a big inflow of metadata this method might not scale as well as the other methods. + +- Use underlying Solr or Elasticsearch from Apache Atlas + +Within Atlas there is the possibility to open up either Solr or the experimental Elasticsearch. It depends on janusgraph +(the behind the scenes graph database) which populates the search engine. Therefore the search engine would not be compatible with +the data builder setup. Adoption of such a search engine would require either new queries, some kind of transformer +within the search engine, or changes within Atlas itself. + +## Discussion +Both the REST API approach and the data builder approach can be implemented and be configurable. Both approaches have +their own benefits, the data builder together provides a more fine-tuned search whereas the Atlas REST API comes out +of the box with Atlas. The last approach of using the underlying search engine from Atlas provides direct access +to all the meta data with a decent search API. However, integration would be less straight forward as the indexes would +differ from the data builders search engine loader. + + +The focus is initially to implement the REST API approach and afterwards implement an Atlas data extractor and perhaps + importer. diff --git a/requirements.txt b/requirements.txt index 83a893f0..fc3eb61d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ typing==3.6.4 Werkzeug==0.14.1 wheel==0.31.1 mypy==0.660 +atlasclient==0.1.6 diff --git a/search_service/config.py b/search_service/config.py index a16c2e4c..2d9178d9 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -14,6 +14,7 @@ PROXY_CLIENT = 'PROXY_CLIENT' PROXY_CLIENTS = { 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy', + 'ATLAS': 'search_service.proxy.atlas.AtlasProxy' } @@ -23,6 +24,20 @@ class Config: LOG_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z' LOG_LEVEL = 'INFO' + # Used to differentiate tables with other entities in Atlas. For more details: + # https://github.com/lyft/amundsenmetadatalibrary/blob/master/docs/proxy/atlas_proxy.md + ATLAS_TABLE_ENTITY = 'Table' + + # The relationalAttribute name of Atlas Entity that identifies the database entity. + ATLAS_DB_ATTRIBUTE = 'db' + + # Display name of Atlas Entities that we use for amundsen project. + # Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute. + ATLAS_NAME_ATTRIBUTE = 'qualifiedName' + + # Config used by ElastichSearch + ELASTICSEARCH_INDEX = 'table_search_index' + class LocalConfig(Config): DEBUG = False diff --git a/search_service/proxy/atlas.py b/search_service/proxy/atlas.py new file mode 100644 index 00000000..52157f57 --- /dev/null +++ b/search_service/proxy/atlas.py @@ -0,0 +1,230 @@ +import logging + +from atlasclient.client import Atlas +from atlasclient.exceptions import BadRequest +from atlasclient.models import Entity, EntityCollection +# default search page size +from flask import current_app as app +from typing import List, Dict + +from search_service.models.search_result import SearchResult +from search_service.models.table import Table +from search_service.proxy import BaseProxy +from search_service.proxy.statsd_utilities import timer_with_counter + +DEFAULT_PAGE_SIZE = 10 +LOGGING = logging.getLogger(__name__) + + +class AtlasProxy(BaseProxy): + TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY'] + DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE'] + NAME_ATTRIBUTE = app.config['ATLAS_NAME_ATTRIBUTE'] + ATTRS_KEY = 'attributes' + REL_ATTRS_KEY = 'relationshipAttributes' + + """ + AtlasSearch connection handler + """ + atlas: Atlas + + def __init__(self, *, + host: str = None, + index: str = None, + user: str = '', + password: str = '', + atlas_client: Atlas = None, + page_size: int = DEFAULT_PAGE_SIZE) -> None: + if atlas_client: + self.atlas = atlas_client + else: + self.atlas = self._create_client_from_credentials(host=host, + user=user, + password=password) + + self.index = index + self.page_size = page_size + + @staticmethod + def _create_client_from_credentials(*, + host: str = None, + user: str = '', + password: str = '') -> Atlas: + """ + Construct Atlas client that connects to cluster at {host} and {port} + and authenticates using {user} and {password} + :return: Atlas client object + """ + return Atlas(host, username=user, password=password) + + @staticmethod + def _entities(collections: EntityCollection) -> List[Entity]: + """ + Helper method for flattening all collections from {collections} + :return: list of all entities + """ + entities: List[Entity] = [] + for collection in collections: + entities.extend(collection.entities) + return entities + + def _parse_results(self, response: EntityCollection) -> List[Table]: + """ + based on an atlas {response} with table entities, we map the required information + :return: list of tables + """ + table_results = [] + ids = list() + for hit in response: + ids.append(hit.guid) + # receive all entities + entities = self._entities(self.atlas.entity_bulk(guid=ids)) + db_ids = [] + for entity in entities: + relations = entity.relationshipAttributes + database = relations.get(self.DB_ATTRIBUTE) + if database: + db_ids.append(database['guid']) + + # request databases + dbs_list = self._entities(self.atlas.entity_bulk(guid=db_ids)) if len(db_ids) > 0 else [] + dbs_dict: Dict[str, Entity] = {db.guid: db for db in dbs_list} + for entity in entities: + relations = entity.relationshipAttributes + attrs = entity.attributes + database = relations.get(self.DB_ATTRIBUTE) + if database and database['guid'] in dbs_dict: + db_entity = dbs_dict[database['guid']] + db_attrs = db_entity.attributes + db_name = db_attrs.get(self.NAME_ATTRIBUTE, '') + db_cluster = db_attrs.get('clusterName', '') + else: + db_cluster = '' + db_name = '' + + tags = [] + # Using or in case, if the key 'classifications' is there with attrs None + for classification in attrs.get("classifications") or list(): + tags.append( + classification.get('typeName') + ) + + # TODO: Implement columns + columns: List[str] = [] + # for column in attrs.get('columns') or list(): + # col_entity = entity.referredEntities[column['guid']] + # col_attrs = col_entity['attributes'] + # columns.append(col_attrs.get(self.NAME_KEY)) + + table = Table(name=attrs.get(self.NAME_ATTRIBUTE), + key=attrs.get(self.NAME_ATTRIBUTE), + description=attrs['description'], + cluster=db_cluster, + database=db_name, + schema_name=db_name, + column_names=columns, + tags=tags, + last_updated_epoch=attrs.get('updateTime')) + + table_results.append(table) + + return table_results + + @timer_with_counter + def fetch_search_results_with_field(self, *, + query_term: str, + field_name: str, + field_value: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects. + Per field name we have a count query and a query for the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param field_name: field name to do the searching(e.g schema_name, tag_names) + :param field_value: value for the field for filtering + :param page_index: index of search page user is currently on + :return: SearchResult Object + :return: + """ + + sql = f"Table from Table where false" + count_sql = f"{sql} select count()" + if field_name == 'tag': + sql = f"from Table where Table is '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'schema': + sql = f"from Table where db.name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'table': + sql = f"from Table where name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'column': + sql = f"hive_column where name like '{field_value}' select table" + # TODO nanne: count tables instead of columns + count_sql = f"hive_column where name like '{field_value}' select count()" + + LOGGING.debug(f"Used following sql query: {sql}") + tables: List[Table] = [] + count_value = 0 + try: + # count results + count_params = {'query': count_sql} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + params = {'query': f"{sql} limit {self.page_size} offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + if count_value > 0 and page_index * self.page_size <= count_value: + # unpack all collections (usually just one collection though) + for collection in search_results: + if hasattr(collection, 'entities'): + tables.extend(self._parse_results(response=collection.entities)) + except BadRequest: + pass + + return SearchResult(total_results=count_value, results=tables) + + @timer_with_counter + def fetch_search_results(self, *, + query_term: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects + We use the Atlas DSL for querying the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param page_index: index of search page user is currently on + :return: SearchResult Object + """ + + if not query_term: + # return empty result for blank query term + return SearchResult(total_results=0, results=[]) + + # define query + sql = f"Table from Table " \ + f"where name like '*{query_term}*' or " \ + f"description like '*{query_term}*' " + + # count amount of tables + count_params = {'query': f"{sql} select count()"} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + # select tables + params = { + 'query': f"{sql} " + f"limit {self.page_size} " + f"offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + + # retrieve results + tables = [] + if count_value > 0 and page_index * self.page_size <= count_value: + for s in search_results: + tables.extend(self._parse_results(response=s.entities)) + + return SearchResult(total_results=count_value, results=tables) diff --git a/search_service/proxy/base.py b/search_service/proxy/base.py index 559adf2d..cbcc27e1 100644 --- a/search_service/proxy/base.py +++ b/search_service/proxy/base.py @@ -1,7 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import Union, List, Dict, Any - from search_service.models.search_result import SearchResult diff --git a/tests/unit/proxy/test_atlas.py b/tests/unit/proxy/test_atlas.py new file mode 100644 index 00000000..e020b3e5 --- /dev/null +++ b/tests/unit/proxy/test_atlas.py @@ -0,0 +1,285 @@ +import string +import unittest + +from mock import MagicMock +from typing import List, Callable, Tuple + +from search_service import create_app +from search_service.models.search_result import SearchResult +from search_service.models.table import Table + + +class TestAtlasProxy(unittest.TestCase): + maxDiff = None + + def setUp(self): + self.app = create_app(config_module_class='search_service.config.LocalConfig') + self.app_context = self.app.app_context() + self.app_context.push() + with self.app_context: + from search_service.proxy.atlas import AtlasProxy + self.proxy = AtlasProxy(host='DOES_NOT_MATTER:0000') + self.proxy.atlas = MagicMock() + + self.entity_type = 'TEST_ENTITY' + self.cluster = 'TEST_CLUSTER' + self.db = 'TEST_DB' + self.name = 'TEST_TABLE' + self.table_uri = f'{self.entity_type}://{self.cluster}.{self.db}/{self.name}' + + self.classification_entity = { + 'classifications': [ + {'typeName': 'PII_DATA', 'name': 'PII_DATA'}, + ] + } + self.test_column = { + 'guid': 'DOESNT_MATTER', + 'typeName': 'COLUMN', + 'attributes': { + 'qualifiedName': 'column@name', + 'type': 'Managed', + 'description': 'column description', + 'position': 1 + } + + } + self.db_entity = { + 'guid': '-100', + 'typeName': self.entity_type, + 'attributes': { + 'qualifiedName': self.db, + 'name': self.db + } + } + self.entity1 = { + 'guid': '1', + 'typeName': self.entity_type, + 'classificationNames': [ + 'PII_DATA' + ], + 'relationshipAttributes': { + 'db': self.db_entity + }, + 'attributes': { + 'updateTime': 123, + 'qualifiedName': 'Table1_Qualified', + 'name': 'Table1', + 'classifications': [ + {'typeName': 'PII_DATA'} + ], + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + 'columns': [self.test_column], + 'db': self.db_entity + } + } + self.entity1.update(self.classification_entity) + + self.entity2 = { + 'guid': '2', + 'typeName': self.entity_type, + 'classificationNames': [], + 'attributes': { + 'updateTime': 234, + 'qualifiedName': 'Table2_Qualified', + 'name': 'Table1', + 'db': None, + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + } + } + self.entity2.update(self.classification_entity) + self.entities = { + 'entities': [ + self.entity1, + self.entity2, + ], + } + + @staticmethod + def recursive_mock(start: any): + """ + The atlas client allows retrieval of data via __getattr__. + That is why we build this method to recursively mock dictionary's to add + the __getattr__ and to convert them into MagicMock. + :param start: dictionary, list, or other + :return: MagicMock, list with mocked items, or other + """ + if isinstance(start, dict): + dict_mock = MagicMock() + dict_mock.__getitem__.side_effect = start.__getitem__ + dict_mock.__iter__.side_effect = start.__iter__ + dict_mock.__contains__.side_effect = start.__contains__ + dict_mock.get.side_effect = start.get + for key, value in start.items(): + value_mock = TestAtlasProxy.recursive_mock(value) + dict_mock.__setattr__(key, value_mock) + start[key] = value_mock + return dict_mock + elif isinstance(start, (list,)): + return list(map(TestAtlasProxy.recursive_mock, start)) + else: + return start + + @staticmethod + def dsl_inject(checks: List[Tuple[Callable[[str], bool], dict]]): + """ + helper method for returning results based on sql queries + :param checks: + :return: + """ + def search_dsl(query: string): + for check, data in checks: + if check(query): + response = MagicMock() + d = TestAtlasProxy.recursive_mock(data) + d.__iter__.return_value = [d] + d._data = { + 'queryType': "DSL", + 'queryText': query, + **data + } + response.__iter__.return_value = [d] + + return response + raise Exception(f"query not supported: {query}") + + return search_dsl + + @staticmethod + def bulk_inject(entities: dict): + """ + provide an entity_bulk method for atlas + :param entities: + :return: + """ + + def guid_filter(guid: List): + return TestAtlasProxy.recursive_mock([{ + 'entities': list(filter(lambda x: x['guid'] in guid, entities)) + }]) + + return guid_filter + + def test_search_normal(self): + expected = SearchResult(total_results=1, + results=[Table(name='Table1_Qualified', + key='Table1_Qualified', + description='Dummy Description', + cluster='', + database='TEST_DB', + schema_name='TEST_DB', + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123), + Table(name='Table2_Qualified', + key='Table2_Qualified', + description='Dummy Description', + cluster='', + database='', + schema_name='', + column_names=[ + # 'column@name' + ], + tags=[], + last_updated_epoch=234), + ]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl and "Table" in dsl, + {"attributes": {"name": ["count()"], "values": [[2]]}}), + (lambda dsl: "Table" in dsl and any(x in dsl for x in ["select table", "from Table"]), + {'entities': [self.entity1, self.entity2]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table") + self.assertTrue(resp.total_results == 2, "there should be 2 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + self.assertDictEqual(vars(resp.results[1]), vars(expected.results[1]), + "Search Result doesn't match with expected result!") + + def test_search_empty(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!") + + def test_search_fields(self): + fields = ['tag', 'schema', 'table', 'column'] + for field in fields: + expected = SearchResult(total_results=1, + results=[Table(name='Table1_Qualified', + key='Table1_Qualified', + description='Dummy Description', + cluster='', + database='TEST_DB', + schema_name='TEST_DB', + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123)]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[1]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table", "hive_column"]), + {'entities': [self.entity1]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.db_entity + ]) + resp = self.proxy.fetch_search_results_with_field( + query_term=field + "Table1", + field_name=field, + field_value="Table1" + ) + self.assertTrue(resp.total_results == 1, "there should be 1 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + + def test_unknown_field(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="unknown:Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!")