diff --git a/README.md b/README.md index d7f14436..04b84626 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,10 @@ Proxy package contains proxy modules that talks dependencies of Search service. ##### [Elasticsearch proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/elasticsearch.py "Elasticsearch proxy module") [Elasticsearch](https://www.elastic.co/products/elasticsearch "Elasticsearch") proxy module serves various use case of searching metadata from Elasticsearch. It uses [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html "Query DSL") for the use case, execute the search query and transform into [model](https://github.com/lyft/amundsensearchlibrary/tree/master/search_service/models "model"). +##### [Atlas proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/atlas.py "Atlas proxy module") +[Apache Atlas](https://atlas.apache.org/ "Apache Atlas") proxy module uses Atlas to serve the Atlas requests. At the moment the search DSL REST api is used via the [Python Client](https://atlasclient.readthedocs.io/ "Atlas Client"). + + ##### [Statsd utilities module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/statsd_utilities.py "Statsd utilities module") [Statsd](https://github.com/etsy/statsd/wiki "Statsd") utilities module has methods / functions to support statsd to publish metrics. By default, statsd integration is disabled and you can turn in on from [Search service configuration](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/config.py#L7 "Search service configuration"). For specific configuration related to statsd, you can configure it through [environment variable.](https://statsd.readthedocs.io/en/latest/configure.html#from-the-environment "environment variable.") diff --git a/docs/atlas-search.md b/docs/atlas-search.md new file mode 100644 index 00000000..7da4a880 --- /dev/null +++ b/docs/atlas-search.md @@ -0,0 +1,35 @@ +# Atlas search investigation +There are several approaches to integrate searching within [Apache Atlas](https://atlas.apache.org/ "Apache Atlas"), we describe multiple options below: + +- Use REST API's + +Directly using the Atlas API's is quick to implement and easy to setup for administrators. Atlas uses a search engine +underwater (embedded Solr) to perform search queries, thus in theory this method should scale up. Disadvantages are that +we are limited to the REST api that Atlas offers, we could potentially add functionality via pull requests and extend +the search capabilities. The [advanced search](https://atlas.apache.org/Search-Advanced.html "Apache Atlas Advanced Search") +provides a DSL which contains basic forms of aggregation and arithmetic. + +- Use Data Builder to fill Elasticsearch from Atlas + +Adopting Atlas within the Data Builder to fill Elasticsearch is a relatively straightforward way of staying +compatible with the Neo4j database. It could either be pulling data from Atlas or being pushed by Kafka. This method +requires a setup of Elasticsearch and Airflow, which increases the amount of infrastructure and maintenance. +Another disadvantage is that with a big inflow of metadata this method might not scale as well as the other methods. + +- Use underlying Solr or Elasticsearch from Apache Atlas + +Within Atlas there is the possibility to open up either Solr or the experimental Elasticsearch. It depends on janusgraph +(the behind the scenes graph database) which populates the search engine. Therefore the search engine would not be compatible with +the data builder setup. Adoption of such a search engine would require either new queries, some kind of transformer +within the search engine, or changes within Atlas itself. + +## Discussion +Both the REST API approach and the data builder approach can be implemented and be configurable. Both approaches have +their own benefits, the data builder together provides a more fine-tuned search whereas the Atlas REST API comes out +of the box with Atlas. The last approach of using the underlying search engine from Atlas provides direct access +to all the meta data with a decent search API. However, integration would be less straight forward as the indexes would +differ from the data builders search engine loader. + + +The focus is initially to implement the REST API approach and afterwards potentially implement an Atlas data extractor +and importer within the Amundsen Data Builder. So that administrators have more flexibility in combining data sources. diff --git a/requirements.txt b/requirements.txt index 83a893f0..fc3eb61d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ typing==3.6.4 Werkzeug==0.14.1 wheel==0.31.1 mypy==0.660 +atlasclient==0.1.6 diff --git a/search_service/config.py b/search_service/config.py index a16c2e4c..30d7e9da 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -14,15 +14,30 @@ PROXY_CLIENT = 'PROXY_CLIENT' PROXY_CLIENTS = { 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy', + 'ATLAS': 'search_service.proxy.atlas.AtlasProxy' } class Config: - LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d (%(process)d:' \ + LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d (%(process)d:'\ '%(threadName)s) - %(message)s' LOG_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z' LOG_LEVEL = 'INFO' + # Used to differentiate tables with other entities in Atlas. For more details: + # https://github.com/lyft/amundsenmetadatalibrary/blob/master/docs/proxy/atlas_proxy.md + ATLAS_TABLE_ENTITY = 'Table' + + # The relationalAttribute name of Atlas Entity that identifies the database entity. + ATLAS_DB_ATTRIBUTE = 'db' + + # Display name of Atlas Entities that we use for amundsen project. + # Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute. + ATLAS_NAME_ATTRIBUTE = 'qualifiedName' + + # Config used by ElastichSearch + ELASTICSEARCH_INDEX = 'table_search_index' + class LocalConfig(Config): DEBUG = False diff --git a/search_service/proxy/atlas.py b/search_service/proxy/atlas.py new file mode 100644 index 00000000..0f996162 --- /dev/null +++ b/search_service/proxy/atlas.py @@ -0,0 +1,212 @@ +import logging + +from atlasclient.client import Atlas +from atlasclient.exceptions import BadRequest +from atlasclient.models import Entity, EntityCollection +# default search page size +from flask import current_app as app +from typing import List, Dict + +from search_service.models.search_result import SearchResult +from search_service.models.table import Table +from search_service.proxy import BaseProxy +from search_service.proxy.statsd_utilities import timer_with_counter + +DEFAULT_PAGE_SIZE = 10 +LOGGER = logging.getLogger(__name__) + + +class AtlasProxy(BaseProxy): + TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY'] + DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE'] + NAME_ATTRIBUTE = app.config['ATLAS_NAME_ATTRIBUTE'] + ATTRS_KEY = 'attributes' + REL_ATTRS_KEY = 'relationshipAttributes' + + """ + AtlasSearch connection handler + """ + atlas: Atlas + + def __init__(self, *, + host: str = None, + index: str = None, + user: str = '', + password: str = '', + page_size: int = DEFAULT_PAGE_SIZE) -> None: + self.atlas = Atlas(host, username=user, password=password) + self.index = index + self.page_size = page_size + + @staticmethod + def _entities(collections: EntityCollection) -> List[Entity]: + """ + Helper method for flattening all collections from {collections} + :return: list of all entities + """ + entities: List[Entity] = [] + for collection in collections: + entities.extend(collection.entities) + return entities + + def _parse_results(self, response: EntityCollection) -> List[Table]: + """ + based on an atlas {response} with table entities, we map the required information + :return: list of tables + """ + table_results = [] + ids = list() + for hit in response: + ids.append(hit.guid) + # receive all entities + entities = self._entities(self.atlas.entity_bulk(guid=ids)) + db_ids = [] + for entity in entities: + relations = entity.relationshipAttributes + database = relations.get(self.DB_ATTRIBUTE) + if database: + db_ids.append(database['guid']) + + # request databases + dbs_list = self._entities(self.atlas.entity_bulk(guid=db_ids)) if len(db_ids) > 0 else [] + dbs_dict: Dict[str, Entity] = {db.guid: db for db in dbs_list} + for entity in entities: + relations = entity.relationshipAttributes + attrs = entity.attributes + database = relations.get(self.DB_ATTRIBUTE) + if database and database['guid'] in dbs_dict: + db_entity = dbs_dict[database['guid']] + db_attrs = db_entity.attributes + + db_name = db_attrs.get(self.NAME_ATTRIBUTE) + db_cluster = db_attrs.get("clusterName", "") + else: + db_cluster = '' + db_name = '' + + tags = [] + # Using or in case, if the key 'classifications' is there with attrs None + for classification in attrs.get("classifications") or list(): + tags.append( + classification.get('typeName') + ) + + # TODO: Implement columns + columns: List[str] = [] + # for column in attrs.get('columns') or list(): + # col_entity = entity.referredEntities[column['guid']] + # col_attrs = col_entity['attributes'] + # columns.append(col_attrs.get(self.NAME_KEY)) + table_name = attrs.get(self.NAME_ATTRIBUTE) + table = Table(name=table_name, + key=f"{entity.typeName}://{db_cluster}.{db_name}/{table_name}", + description=attrs.get('description'), + cluster=db_cluster, + database=entity.typeName or 'Table', + schema_name=db_name, + column_names=columns, + tags=tags, + last_updated_epoch=attrs.get('updateTime')) + + table_results.append(table) + + return table_results + + @timer_with_counter + def fetch_search_results_with_field(self, *, + query_term: str, + field_name: str, + field_value: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects. + Per field name we have a count query and a query for the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param field_name: field name to do the searching(e.g schema_name, tag_names) + :param field_value: value for the field for filtering + :param page_index: index of search page user is currently on + :return: SearchResult Object + :return: + """ + + sql = f"Table from Table where false" + count_sql = f"{sql} select count()" + if field_name == 'tag': + sql = f"from Table where Table is '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'schema': + sql = f"from Table where db.name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'table': + sql = f"from Table where name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'column': + sql = f"hive_column where name like '{field_value}' select table" + # TODO nanne: count tables instead of columns + count_sql = f"hive_column where name like '{field_value}' select count()" + + LOGGER.debug(f"Used following sql query: {sql}") + tables: List[Table] = [] + count_value = 0 + try: + # count results + count_params = {'query': count_sql} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + params = {'query': f"{sql} limit {self.page_size} offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + if count_value > 0 and page_index * self.page_size <= count_value: + # unpack all collections (usually just one collection though) + for collection in search_results: + if hasattr(collection, 'entities'): + tables.extend(self._parse_results(response=collection.entities)) + except BadRequest: + LOGGER.error("Atlas Search DSL error with the following query:", sql) + + return SearchResult(total_results=count_value, results=tables) + + @timer_with_counter + def fetch_search_results(self, *, + query_term: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects + We use the Atlas DSL for querying the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param page_index: index of search page user is currently on + :return: SearchResult Object + """ + + if not query_term: + # return empty result for blank query term + return SearchResult(total_results=0, results=[]) + + # define query + sql = f"Table from Table " \ + f"where name like '*{query_term}*' or " \ + f"description like '*{query_term}*' " + + # count amount of tables + count_params = {'query': f"{sql} select count()"} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + # select tables + params = { + 'query': f"{sql} " + f"limit {self.page_size} " + f"offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + + # retrieve results + tables = [] + if 0 < count_value >= page_index * self.page_size: + for s in search_results: + tables.extend(self._parse_results(response=s.entities)) + + return SearchResult(total_results=count_value, results=tables) diff --git a/search_service/proxy/base.py b/search_service/proxy/base.py index 559adf2d..cbcc27e1 100644 --- a/search_service/proxy/base.py +++ b/search_service/proxy/base.py @@ -1,7 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import Union, List, Dict, Any - from search_service.models.search_result import SearchResult diff --git a/tests/unit/proxy/test_atlas.py b/tests/unit/proxy/test_atlas.py new file mode 100644 index 00000000..f1cf80ec --- /dev/null +++ b/tests/unit/proxy/test_atlas.py @@ -0,0 +1,301 @@ +import string +import unittest + +from mock import MagicMock +from typing import List, Callable, Tuple + +from search_service import create_app +from search_service.models.search_result import SearchResult +from search_service.models.table import Table + + +class TestAtlasProxy(unittest.TestCase): + maxDiff = None + + def setUp(self): + self.app = create_app(config_module_class='search_service.config.LocalConfig') + self.app_context = self.app.app_context() + self.app_context.push() + self.qn = False + with self.app_context: + from search_service.proxy.atlas import AtlasProxy + self.proxy = AtlasProxy(host='DOES_NOT_MATTER:0000') + self.proxy.atlas = MagicMock() + self.qn = self.app.config['ATLAS_NAME_ATTRIBUTE'] == "qualifiedName" + self.entity_type = 'TEST_ENTITY' + self.cluster = 'TEST_CLUSTER' + self.db = 'TEST_DB' + self.name = 'TEST_TABLE' + self.table_uri = f'{self.entity_type}://{self.cluster}.{self.db}/{self.name}' + + self.classification_entity = { + 'classifications': [ + {'typeName': 'PII_DATA', 'name': 'PII_DATA'}, + ] + } + self.test_column = { + 'guid': 'DOESNT_MATTER', + 'typeName': 'COLUMN', + 'attributes': { + 'qualifiedName': f"{self.db}.Table1.column@{self.cluster}", + 'type': 'Managed', + 'description': 'column description', + 'position': 1 + } + + } + self.db_entity = { + 'guid': '-100', + 'typeName': self.entity_type, + 'attributes': { + 'qualifiedName': self.db + "@" + self.cluster, + 'name': self.db, + 'clusterName': self.cluster + } + } + self.entity1 = { + 'guid': '1', + 'typeName': self.entity_type, + 'classificationNames': [ + 'PII_DATA' + ], + 'relationshipAttributes': { + 'db': self.db_entity + }, + 'attributes': { + 'updateTime': 123, + 'name': 'Table1', + 'qualifiedName': f"{self.db}.Table1@{self.cluster}", + 'classifications': [ + {'typeName': 'PII_DATA'} + ], + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + 'columns': [self.test_column], + 'db': self.db_entity + } + } + self.entity1.update(self.classification_entity) + + self.entity2 = { + 'guid': '2', + 'typeName': self.entity_type, + 'classificationNames': [], + 'attributes': { + 'updateTime': 234, + 'qualifiedName': f"Table2", + 'name': 'Table2', + 'db': None, + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + } + } + self.entity2.update(self.classification_entity) + self.entities = { + 'entities': [ + self.entity1, + self.entity2, + ], + } + + def _qualified(self, kind, name, table=None): + if not self.qn: + return name + if kind == "db": + return f"{name}@{self.cluster}" + if kind == "column" and table: + return f"{self.db}.{table}.{name}@{self.cluster}" + if kind == "table": + return f"{self.db}.{name}@{self.cluster}" + return name + + @staticmethod + def recursive_mock(start: any): + """ + The atlas client allows retrieval of data via __getattr__. + That is why we build this method to recursively mock dictionary's to add + the __getattr__ and to convert them into MagicMock. + :param start: dictionary, list, or other + :return: MagicMock, list with mocked items, or other + """ + if isinstance(start, dict): + dict_mock = MagicMock() + dict_mock.__getitem__.side_effect = start.__getitem__ + dict_mock.__iter__.side_effect = start.__iter__ + dict_mock.__contains__.side_effect = start.__contains__ + dict_mock.get.side_effect = start.get + for key, value in start.items(): + value_mock = TestAtlasProxy.recursive_mock(value) + dict_mock.__setattr__(key, value_mock) + start[key] = value_mock + return dict_mock + elif isinstance(start, (list,)): + return list(map(TestAtlasProxy.recursive_mock, start)) + else: + return start + + @staticmethod + def dsl_inject(checks: List[Tuple[Callable[[str], bool], dict]]): + """ + helper method for returning results based on sql queries + :param checks: + :return: + """ + def search_dsl(query: string): + for check, data in checks: + if check(query): + response = MagicMock() + d = TestAtlasProxy.recursive_mock(data) + d.__iter__.return_value = [d] + d._data = { + 'queryType': "DSL", + 'queryText': query, + **data + } + response.__iter__.return_value = [d] + + return response + raise Exception(f"query not supported: {query}") + + return search_dsl + + @staticmethod + def bulk_inject(entities: dict): + """ + provide an entity_bulk method for atlas + :param entities: + :return: + """ + + def guid_filter(guid: List): + return TestAtlasProxy.recursive_mock([{ + 'entities': list(filter(lambda x: x['guid'] in guid, entities)) + }]) + + return guid_filter + + def test_search_normal(self): + expected = SearchResult(total_results=1, + results=[Table(name=self._qualified('table', 'Table1'), + key=f"TEST_ENTITY://TEST_CLUSTER.{self._qualified('db', 'TEST_DB')}/" + f"{self._qualified('table', 'Table1')}", + description='Dummy Description', + cluster='TEST_CLUSTER', + database='TEST_ENTITY', + schema_name=self._qualified('db', 'TEST_DB'), + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123), + Table(name='Table2', + key=f"TEST_ENTITY://./Table2", + description='Dummy Description', + cluster='', + database='TEST_ENTITY', + schema_name='', + column_names=[ + # 'column@name' + ], + tags=[], + last_updated_epoch=234), + ]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl and "Table" in dsl, + {"attributes": {"name": ["count()"], "values": [[2]]}}), + (lambda dsl: "Table" in dsl and any(x in dsl for x in ["select table", "from Table"]), + {'entities': [self.entity1, self.entity2]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table") + self.assertTrue(resp.total_results == 2, "there should be 2 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + self.assertDictEqual(vars(resp.results[1]), vars(expected.results[1]), + "Search Result doesn't match with expected result!") + + def test_search_empty(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!") + + def test_search_fields(self): + fields = ['tag', 'schema', 'table', 'column'] + for field in fields: + + expected = SearchResult(total_results=1, + results=[Table(name=self._qualified('table', 'Table1'), + key=f"TEST_ENTITY://TEST_CLUSTER.{self._qualified('db', 'TEST_DB')}/" + f"{self._qualified('table', 'Table1')}", + description='Dummy Description', + cluster='TEST_CLUSTER', + database='TEST_ENTITY', + schema_name=self._qualified('db', 'TEST_DB'), + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123)]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[1]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table", "hive_column"]), + {'entities': [self.entity1]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.db_entity + ]) + resp = self.proxy.fetch_search_results_with_field( + query_term=field + "Table1", + field_name=field, + field_value="Table1" + ) + self.assertTrue(resp.total_results == 1, "there should be 1 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + + def test_unknown_field(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="unknown:Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!")