From 99e893baccadd06b881700576c4c39198c56f847 Mon Sep 17 00:00:00 2001 From: jornh Date: Fri, 26 Apr 2019 00:44:46 +0200 Subject: [PATCH 01/13] gitignore dist/ as in metadataservice PR #28 (#12) --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 4cf3806d..6178df5d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,10 +6,11 @@ *.egg-info .*.swp .DS_Store +build/ +dist/ venv/ venv3/ .cache/ -build/ .idea/ .coverage *coverage.xml From 520ee914c97aa7824b6cac5c466562c686d4ec1a Mon Sep 17 00:00:00 2001 From: rosejcday <16930240+rosejcday@users.noreply.github.com> Date: Fri, 26 Apr 2019 14:48:34 -0400 Subject: [PATCH 02/13] Changed the name of this file for consistency (#13) To work along side the PR 'updated the install doc #104' in the amundsenfrontendlibrary repo --- Dockerfile => public.Dockerfile | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Dockerfile => public.Dockerfile (100%) diff --git a/Dockerfile b/public.Dockerfile similarity index 100% rename from Dockerfile rename to public.Dockerfile From 9c6e52a3235bdc245e24b537797c42f0a336cc91 Mon Sep 17 00:00:00 2001 From: jornh Date: Wed, 1 May 2019 01:54:51 +0200 Subject: [PATCH 03/13] Doc fix: Docker pull the official image (#14) * Doc fix: Docker pull the official image Similar to metadata service fix https://github.com/lyft/amundsenmetadatalibrary/pull/35 * Typo fix --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bac1e37e..f2484638 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,8 @@ $ curl -v http://localhost:5000/healthcheck ## Instructions to start the service from the Docker ```bash -$ docker pull amundsen-search -$ docker run -p 5000:5000 amundsen-search +$ docker pull amundsendev/amundsen-search:latest +$ docker run -p 5000:5000 amundsendev/amundsen-search ``` In different terminal, verify the service is up by running @@ -90,5 +90,5 @@ Proxy package contains proxy modules that talks dependencies of Search service. For specific configuration related to statsd, you can configure it through [environment variable.](https://statsd.readthedocs.io/en/latest/configure.html#from-the-environment "environment variable.") ### [Models package](https://github.com/lyft/amundsensearchlibrary/tree/master/search_service/models "Models package") -Models package contains many modules where each module has many Python classes in it. These Python classes are being used as a schema and a data holder. All data exchange within Amundsen Search service use classes in Models to ensure validity of itself and improve readability and mainatability. +Models package contains many modules where each module has many Python classes in it. These Python classes are being used as a schema and a data holder. All data exchange within Amundsen Search service use classes in Models to ensure validity of itself and improve readability and maintainability. From d32e79d589849cb10d53f6b3424fe1cd72184452 Mon Sep 17 00:00:00 2001 From: Verdan Mahmood Date: Wed, 1 May 2019 18:41:10 +0200 Subject: [PATCH 04/13] Adds the PR template for amundsen search service (#15) Mostly copied from Apache Airflow, and using the same in other amundsen repositories. --- .github/PULL_REQUEST_TEMPLATE.md | 34 ++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..992f2b28 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,34 @@ +Make sure you have checked **all** steps below. + +### Title + +- [ ] My PR Title addresses the issue accurately and concisely. + - Example: "Updates the version of Flask to v1.0.2" + - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). + +### Description + +- [ ] Here are some details about my PR: + +### Tests + +- [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: + +### Commits + +- [ ] I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": + 1. Subject is separated from body by a blank line + 1. Subject is limited to 50 characters (not including Jira issue reference) + 1. Subject does not end with a period + 1. Subject uses the imperative mood ("add", not "adding") + 1. Body wraps at 72 characters + 1. Body explains "what" and "why", not "how" + +### Documentation + +- [ ] In case of new functionality, my PR adds documentation that describes how to use it. + - All the public functions and the classes in the PR contain docstrings that explain what it does + +### Code Quality & Coverage + +- [ ] Passes `make test` From 2fd6f2697cdf9ac7e542963b267a58513fa32f58 Mon Sep 17 00:00:00 2001 From: Guido Schmutz Date: Fri, 3 May 2019 00:45:49 +0200 Subject: [PATCH 05/13] Set the elasticsearch base (endpoint) from env variable (#16) * support setting ELASTICSEARCH_BASE through environment variable * update formating to fix the CI error * Update config.py --- search_service/config.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/search_service/config.py b/search_service/config.py index 3699bc35..fb154263 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -1,3 +1,5 @@ +import os + ELASTICSEARCH_ENDPOINT_KEY = 'ELASTICSEARCH_ENDPOINT' ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX' ELASTICSEARCH_AUTH_USER_KEY = 'ELASTICSEARCH_AUTH_USER' @@ -19,7 +21,12 @@ class LocalConfig(Config): TESTING = False STATS = False LOCAL_HOST = '0.0.0.0' - ELASTICSEARCH_ENDPOINT = 'http://{LOCAL_HOST}:9200'.format(LOCAL_HOST=LOCAL_HOST) + ELASTICSEARCH_PORT = '9200' + ELASTICSEARCH_ENDPOINT = os.environ.get('ELASTICSEARCH_ENDPOINT', + 'http://{LOCAL_HOST}:{PORT}'.format( + LOCAL_HOST=LOCAL_HOST, + PORT=ELASTICSEARCH_PORT) + ) ELASTICSEARCH_INDEX = 'table_search_index' ELASTICSEARCH_AUTH_USER = 'elastic' ELASTICSEARCH_AUTH_PW = 'elastic' From 3df10771ec762f3e7392708e3f601bd2609e57f6 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Mon, 6 May 2019 16:43:17 -0700 Subject: [PATCH 06/13] Update README.md (#19) Add badge for search repo --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index f2484638..c326ed54 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,10 @@ # Amundsen Search service +[![PyPI version](https://badge.fury.io/py/amundsen-search.svg)](https://badge.fury.io/py/amundsen-search) +[![Build Status](https://api.travis-ci.com/lyft/amundsensearchlibrary.svg?branch=master)](https://travis-ci.com/lyft/amundsensearchlibrary) +[![License](http://img.shields.io/:license-Apache%202-blue.svg)](LICENSE) +[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://img.shields.io/badge/PRs-welcome-brightgreen.svg) +[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://bit.ly/2FVq37z) + Amundsen Search service serves a Restful API and is responsible for searching metadata. The service leverages [Elasticsearch](https://www.elastic.co/products/elasticsearch "Elasticsearch") for most of it's search capabilites. ## Instructions to start the Search service from distribution From 20eed2e3c8bed7894d1e860e0e2a7c18e6b73207 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 7 May 2019 10:27:21 -0700 Subject: [PATCH 07/13] Add codecov based for search repo (#20) --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index e4e79a20..e7a84ea0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,11 @@ python: install: - pip3 install -r requirements.txt - pip3 install mypy +- pip3 install codecov script: - make test +after_success: +- codecov deploy: provider: pypi user: amundsen-dev From a49d68568453098879d901a91f9585e3461bdb10 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 7 May 2019 10:50:00 -0700 Subject: [PATCH 08/13] Update README.md (#22) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c326ed54..d7f14436 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Amundsen Search service [![PyPI version](https://badge.fury.io/py/amundsen-search.svg)](https://badge.fury.io/py/amundsen-search) [![Build Status](https://api.travis-ci.com/lyft/amundsensearchlibrary.svg?branch=master)](https://travis-ci.com/lyft/amundsensearchlibrary) +[![Coverage Status](https://img.shields.io/codecov/c/github/lyft/amundsensearchlibrary/master.svg)](https://codecov.io/github/lyft/amundsensearchlibrary?branch=master) [![License](http://img.shields.io/:license-Apache%202-blue.svg)](LICENSE) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://img.shields.io/badge/PRs-welcome-brightgreen.svg) [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://bit.ly/2FVq37z) From 86ea652fb105f83a4fcd74076bc953155b9940a1 Mon Sep 17 00:00:00 2001 From: Tamika Tannis Date: Tue, 7 May 2019 15:47:46 -0700 Subject: [PATCH 09/13] Update PULL_REQUEST_TEMPLATE.md (#23) ### Summary of Changes This PR consolidates the pull request template initially added by @verdan. It's been requested to be updated across all of our repos: https://github.com/lyft/amundsenfrontendlibrary/pull/125#issuecomment-489228073 ### Tests Unit tests not required for template changes. ### Documentation Documentation not required for template changes. ### CheckList Make sure you have checked **all** steps below to ensure a timely review. - [ ] PR title addresses the issue accurately and concisely. Example: "Updates the version of Flask to v1.0.2" - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). - [ ] PR includes a summary of changes. - [ ] PR adds unit tests, updates existing unit tests, __OR__ documents why no test additions or modifications are needed. - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - All the public functions and the classes in the PR contain docstrings that explain what it does - [ ] PR passes `make test` - [ ] I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" --- .github/PULL_REQUEST_TEMPLATE.md | 38 +++++++++++--------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 992f2b28..016dd7ec 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,34 +1,22 @@ -Make sure you have checked **all** steps below. +### Summary of Changes -### Title - -- [ ] My PR Title addresses the issue accurately and concisely. - - Example: "Updates the version of Flask to v1.0.2" - - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). - -### Description - -- [ ] Here are some details about my PR: +_Include a summary of changes then remove this line_ ### Tests -- [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: - -### Commits - -- [ ] I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": - 1. Subject is separated from body by a blank line - 1. Subject is limited to 50 characters (not including Jira issue reference) - 1. Subject does not end with a period - 1. Subject uses the imperative mood ("add", not "adding") - 1. Body wraps at 72 characters - 1. Body explains "what" and "why", not "how" +_What tests did you add or modify and why? If no tests were added or modified, explain why. Remove this line_ ### Documentation +_What documentation did you add or modify and why? Add any relevant links then remove this line_ + +### CheckList +Make sure you have checked **all** steps below to ensure a timely review. +- [ ] PR title addresses the issue accurately and concisely. Example: "Updates the version of Flask to v1.0.2" + - In case you are adding a dependency, check if the license complies with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). +- [ ] PR includes a summary of changes. +- [ ] PR adds unit tests, updates existing unit tests, __OR__ documents why no test additions or modifications are needed. - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - All the public functions and the classes in the PR contain docstrings that explain what it does - -### Code Quality & Coverage - -- [ ] Passes `make test` +- [ ] PR passes `make test` +- [ ] I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" From 1a05eb2f063a0805d16261fcf4a6751023f502f9 Mon Sep 17 00:00:00 2001 From: Nanne Date: Tue, 14 May 2019 17:58:51 +0200 Subject: [PATCH 10/13] Integrates Atlas DSL Search (#17) * Add base proxy behind elastic search to support other search engines * Add support for Apache Atlas DSL search * Add new proxy with the Apache Atlas as backend * Add document to describing Apache Atlas search methods * Update setup.py --- README.md | 4 + docs/atlas-search.md | 35 +++ requirements.txt | 1 + search_service/api/search.py | 10 +- search_service/config.py | 41 +++- search_service/proxy/__init__.py | 35 +++ search_service/proxy/atlas.py | 212 ++++++++++++++++++ search_service/proxy/base.py | 24 ++ search_service/proxy/elasticsearch.py | 3 +- setup.py | 2 +- tests/unit/proxy/test_atlas.py | 301 ++++++++++++++++++++++++++ 11 files changed, 652 insertions(+), 16 deletions(-) create mode 100644 docs/atlas-search.md create mode 100644 search_service/proxy/atlas.py create mode 100644 search_service/proxy/base.py create mode 100644 tests/unit/proxy/test_atlas.py diff --git a/README.md b/README.md index d7f14436..04b84626 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,10 @@ Proxy package contains proxy modules that talks dependencies of Search service. ##### [Elasticsearch proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/elasticsearch.py "Elasticsearch proxy module") [Elasticsearch](https://www.elastic.co/products/elasticsearch "Elasticsearch") proxy module serves various use case of searching metadata from Elasticsearch. It uses [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html "Query DSL") for the use case, execute the search query and transform into [model](https://github.com/lyft/amundsensearchlibrary/tree/master/search_service/models "model"). +##### [Atlas proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/atlas.py "Atlas proxy module") +[Apache Atlas](https://atlas.apache.org/ "Apache Atlas") proxy module uses Atlas to serve the Atlas requests. At the moment the search DSL REST api is used via the [Python Client](https://atlasclient.readthedocs.io/ "Atlas Client"). + + ##### [Statsd utilities module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/statsd_utilities.py "Statsd utilities module") [Statsd](https://github.com/etsy/statsd/wiki "Statsd") utilities module has methods / functions to support statsd to publish metrics. By default, statsd integration is disabled and you can turn in on from [Search service configuration](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/config.py#L7 "Search service configuration"). For specific configuration related to statsd, you can configure it through [environment variable.](https://statsd.readthedocs.io/en/latest/configure.html#from-the-environment "environment variable.") diff --git a/docs/atlas-search.md b/docs/atlas-search.md new file mode 100644 index 00000000..7da4a880 --- /dev/null +++ b/docs/atlas-search.md @@ -0,0 +1,35 @@ +# Atlas search investigation +There are several approaches to integrate searching within [Apache Atlas](https://atlas.apache.org/ "Apache Atlas"), we describe multiple options below: + +- Use REST API's + +Directly using the Atlas API's is quick to implement and easy to setup for administrators. Atlas uses a search engine +underwater (embedded Solr) to perform search queries, thus in theory this method should scale up. Disadvantages are that +we are limited to the REST api that Atlas offers, we could potentially add functionality via pull requests and extend +the search capabilities. The [advanced search](https://atlas.apache.org/Search-Advanced.html "Apache Atlas Advanced Search") +provides a DSL which contains basic forms of aggregation and arithmetic. + +- Use Data Builder to fill Elasticsearch from Atlas + +Adopting Atlas within the Data Builder to fill Elasticsearch is a relatively straightforward way of staying +compatible with the Neo4j database. It could either be pulling data from Atlas or being pushed by Kafka. This method +requires a setup of Elasticsearch and Airflow, which increases the amount of infrastructure and maintenance. +Another disadvantage is that with a big inflow of metadata this method might not scale as well as the other methods. + +- Use underlying Solr or Elasticsearch from Apache Atlas + +Within Atlas there is the possibility to open up either Solr or the experimental Elasticsearch. It depends on janusgraph +(the behind the scenes graph database) which populates the search engine. Therefore the search engine would not be compatible with +the data builder setup. Adoption of such a search engine would require either new queries, some kind of transformer +within the search engine, or changes within Atlas itself. + +## Discussion +Both the REST API approach and the data builder approach can be implemented and be configurable. Both approaches have +their own benefits, the data builder together provides a more fine-tuned search whereas the Atlas REST API comes out +of the box with Atlas. The last approach of using the underlying search engine from Atlas provides direct access +to all the meta data with a decent search API. However, integration would be less straight forward as the indexes would +differ from the data builders search engine loader. + + +The focus is initially to implement the REST API approach and afterwards potentially implement an Atlas data extractor +and importer within the Amundsen Data Builder. So that administrators have more flexibility in combining data sources. diff --git a/requirements.txt b/requirements.txt index 83a893f0..fc3eb61d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ typing==3.6.4 Werkzeug==0.14.1 wheel==0.31.1 mypy==0.660 +atlasclient==0.1.6 diff --git a/search_service/api/search.py b/search_service/api/search.py index 25e4b411..47928cca 100644 --- a/search_service/api/search.py +++ b/search_service/api/search.py @@ -2,7 +2,7 @@ from flask_restful import Resource, fields, marshal_with, reqparse -from search_service.proxy import elasticsearch +from search_service.proxy import get_proxy_client table_fields = { "name": fields.String, @@ -30,7 +30,7 @@ class SearchAPI(Resource): Search API """ def __init__(self) -> None: - self.elasticsearch = elasticsearch.get_elasticsearch_proxy() + self.proxy = get_proxy_client() self.parser = reqparse.RequestParser(bundle_errors=True) @@ -50,7 +50,7 @@ def get(self) -> Iterable[Any]: try: - results = self.elasticsearch.fetch_search_results( + results = self.proxy.fetch_search_results( query_term=args['query_term'], page_index=args['page_index'] ) @@ -68,7 +68,7 @@ class SearchFieldAPI(Resource): Search API with explict field """ def __init__(self) -> None: - self.elasticsearch = elasticsearch.get_elasticsearch_proxy() + self.proxy = get_proxy_client() self.parser = reqparse.RequestParser(bundle_errors=True) @@ -91,7 +91,7 @@ def get(self, *, field_name: str, args = self.parser.parse_args(strict=True) try: - results = self.elasticsearch.fetch_search_results_with_field( + results = self.proxy.fetch_search_results_with_field( query_term=args.get('query_term'), field_name=field_name, field_value=field_value, diff --git a/search_service/config.py b/search_service/config.py index fb154263..30d7e9da 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -8,6 +8,15 @@ SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE' STATS_FEATURE_KEY = 'STATS' +PROXY_ENDPOINT = 'PROXY_ENDPOINT' +PROXY_USER = 'PROXY_USER' +PROXY_PASSWORD = 'PROXY_PASSWORD' +PROXY_CLIENT = 'PROXY_CLIENT' +PROXY_CLIENTS = { + 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy', + 'ATLAS': 'search_service.proxy.atlas.AtlasProxy' +} + class Config: LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d (%(process)d:'\ @@ -15,18 +24,32 @@ class Config: LOG_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z' LOG_LEVEL = 'INFO' + # Used to differentiate tables with other entities in Atlas. For more details: + # https://github.com/lyft/amundsenmetadatalibrary/blob/master/docs/proxy/atlas_proxy.md + ATLAS_TABLE_ENTITY = 'Table' + + # The relationalAttribute name of Atlas Entity that identifies the database entity. + ATLAS_DB_ATTRIBUTE = 'db' + + # Display name of Atlas Entities that we use for amundsen project. + # Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute. + ATLAS_NAME_ATTRIBUTE = 'qualifiedName' + + # Config used by ElastichSearch + ELASTICSEARCH_INDEX = 'table_search_index' + class LocalConfig(Config): DEBUG = False TESTING = False STATS = False LOCAL_HOST = '0.0.0.0' - ELASTICSEARCH_PORT = '9200' - ELASTICSEARCH_ENDPOINT = os.environ.get('ELASTICSEARCH_ENDPOINT', - 'http://{LOCAL_HOST}:{PORT}'.format( - LOCAL_HOST=LOCAL_HOST, - PORT=ELASTICSEARCH_PORT) - ) - ELASTICSEARCH_INDEX = 'table_search_index' - ELASTICSEARCH_AUTH_USER = 'elastic' - ELASTICSEARCH_AUTH_PW = 'elastic' + PROXY_PORT = '9200' + PROXY_ENDPOINT = os.environ.get('PROXY_ENDPOINT', + 'http://{LOCAL_HOST}:{PORT}'.format( + LOCAL_HOST=LOCAL_HOST, + PORT=PROXY_PORT) + ) + PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')] + PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic') + PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic') diff --git a/search_service/proxy/__init__.py b/search_service/proxy/__init__.py index e69de29b..933b6ea2 100644 --- a/search_service/proxy/__init__.py +++ b/search_service/proxy/__init__.py @@ -0,0 +1,35 @@ +from threading import Lock + +from flask import current_app + +from search_service import config +from search_service.proxy.base import BaseProxy +from werkzeug.utils import import_string + +_proxy_client = None +_proxy_client_lock = Lock() + + +def get_proxy_client() -> BaseProxy: + """ + Provides singleton proxy client based on the config + :return: Proxy instance of any subclass of BaseProxy + """ + global _proxy_client + + if _proxy_client: + return _proxy_client + + with _proxy_client_lock: + if _proxy_client: + return _proxy_client + else: + # Gather all the configuration to create a Proxy Client + host = current_app.config[config.PROXY_ENDPOINT] + user = current_app.config[config.PROXY_USER] + password = current_app.config[config.PROXY_PASSWORD] + + client = import_string(current_app.config[config.PROXY_CLIENT]) + _proxy_client = client(host=host, index=None, user=user, password=password) + + return _proxy_client diff --git a/search_service/proxy/atlas.py b/search_service/proxy/atlas.py new file mode 100644 index 00000000..0f996162 --- /dev/null +++ b/search_service/proxy/atlas.py @@ -0,0 +1,212 @@ +import logging + +from atlasclient.client import Atlas +from atlasclient.exceptions import BadRequest +from atlasclient.models import Entity, EntityCollection +# default search page size +from flask import current_app as app +from typing import List, Dict + +from search_service.models.search_result import SearchResult +from search_service.models.table import Table +from search_service.proxy import BaseProxy +from search_service.proxy.statsd_utilities import timer_with_counter + +DEFAULT_PAGE_SIZE = 10 +LOGGER = logging.getLogger(__name__) + + +class AtlasProxy(BaseProxy): + TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY'] + DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE'] + NAME_ATTRIBUTE = app.config['ATLAS_NAME_ATTRIBUTE'] + ATTRS_KEY = 'attributes' + REL_ATTRS_KEY = 'relationshipAttributes' + + """ + AtlasSearch connection handler + """ + atlas: Atlas + + def __init__(self, *, + host: str = None, + index: str = None, + user: str = '', + password: str = '', + page_size: int = DEFAULT_PAGE_SIZE) -> None: + self.atlas = Atlas(host, username=user, password=password) + self.index = index + self.page_size = page_size + + @staticmethod + def _entities(collections: EntityCollection) -> List[Entity]: + """ + Helper method for flattening all collections from {collections} + :return: list of all entities + """ + entities: List[Entity] = [] + for collection in collections: + entities.extend(collection.entities) + return entities + + def _parse_results(self, response: EntityCollection) -> List[Table]: + """ + based on an atlas {response} with table entities, we map the required information + :return: list of tables + """ + table_results = [] + ids = list() + for hit in response: + ids.append(hit.guid) + # receive all entities + entities = self._entities(self.atlas.entity_bulk(guid=ids)) + db_ids = [] + for entity in entities: + relations = entity.relationshipAttributes + database = relations.get(self.DB_ATTRIBUTE) + if database: + db_ids.append(database['guid']) + + # request databases + dbs_list = self._entities(self.atlas.entity_bulk(guid=db_ids)) if len(db_ids) > 0 else [] + dbs_dict: Dict[str, Entity] = {db.guid: db for db in dbs_list} + for entity in entities: + relations = entity.relationshipAttributes + attrs = entity.attributes + database = relations.get(self.DB_ATTRIBUTE) + if database and database['guid'] in dbs_dict: + db_entity = dbs_dict[database['guid']] + db_attrs = db_entity.attributes + + db_name = db_attrs.get(self.NAME_ATTRIBUTE) + db_cluster = db_attrs.get("clusterName", "") + else: + db_cluster = '' + db_name = '' + + tags = [] + # Using or in case, if the key 'classifications' is there with attrs None + for classification in attrs.get("classifications") or list(): + tags.append( + classification.get('typeName') + ) + + # TODO: Implement columns + columns: List[str] = [] + # for column in attrs.get('columns') or list(): + # col_entity = entity.referredEntities[column['guid']] + # col_attrs = col_entity['attributes'] + # columns.append(col_attrs.get(self.NAME_KEY)) + table_name = attrs.get(self.NAME_ATTRIBUTE) + table = Table(name=table_name, + key=f"{entity.typeName}://{db_cluster}.{db_name}/{table_name}", + description=attrs.get('description'), + cluster=db_cluster, + database=entity.typeName or 'Table', + schema_name=db_name, + column_names=columns, + tags=tags, + last_updated_epoch=attrs.get('updateTime')) + + table_results.append(table) + + return table_results + + @timer_with_counter + def fetch_search_results_with_field(self, *, + query_term: str, + field_name: str, + field_value: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects. + Per field name we have a count query and a query for the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param field_name: field name to do the searching(e.g schema_name, tag_names) + :param field_value: value for the field for filtering + :param page_index: index of search page user is currently on + :return: SearchResult Object + :return: + """ + + sql = f"Table from Table where false" + count_sql = f"{sql} select count()" + if field_name == 'tag': + sql = f"from Table where Table is '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'schema': + sql = f"from Table where db.name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'table': + sql = f"from Table where name like '{field_value}'" + count_sql = f"{sql} select count()" + elif field_name == 'column': + sql = f"hive_column where name like '{field_value}' select table" + # TODO nanne: count tables instead of columns + count_sql = f"hive_column where name like '{field_value}' select count()" + + LOGGER.debug(f"Used following sql query: {sql}") + tables: List[Table] = [] + count_value = 0 + try: + # count results + count_params = {'query': count_sql} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + params = {'query': f"{sql} limit {self.page_size} offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + if count_value > 0 and page_index * self.page_size <= count_value: + # unpack all collections (usually just one collection though) + for collection in search_results: + if hasattr(collection, 'entities'): + tables.extend(self._parse_results(response=collection.entities)) + except BadRequest: + LOGGER.error("Atlas Search DSL error with the following query:", sql) + + return SearchResult(total_results=count_value, results=tables) + + @timer_with_counter + def fetch_search_results(self, *, + query_term: str, + page_index: int = 0) -> SearchResult: + """ + Query Atlas and return results as list of Table objects + We use the Atlas DSL for querying the tables. + https://atlas.apache.org/Search-Advanced.html + + :param query_term: search query term + :param page_index: index of search page user is currently on + :return: SearchResult Object + """ + + if not query_term: + # return empty result for blank query term + return SearchResult(total_results=0, results=[]) + + # define query + sql = f"Table from Table " \ + f"where name like '*{query_term}*' or " \ + f"description like '*{query_term}*' " + + # count amount of tables + count_params = {'query': f"{sql} select count()"} + count_results = list(self.atlas.search_dsl(**count_params))[0] + count_value = count_results._data['attributes']['values'][0][0] + + # select tables + params = { + 'query': f"{sql} " + f"limit {self.page_size} " + f"offset {page_index * self.page_size}"} + search_results = self.atlas.search_dsl(**params) + + # retrieve results + tables = [] + if 0 < count_value >= page_index * self.page_size: + for s in search_results: + tables.extend(self._parse_results(response=s.entities)) + + return SearchResult(total_results=count_value, results=tables) diff --git a/search_service/proxy/base.py b/search_service/proxy/base.py new file mode 100644 index 00000000..cbcc27e1 --- /dev/null +++ b/search_service/proxy/base.py @@ -0,0 +1,24 @@ +from abc import ABCMeta, abstractmethod + +from search_service.models.search_result import SearchResult + + +class BaseProxy(metaclass=ABCMeta): + """ + Base Proxy, which behaves like an interface for all + the proxy clients available in the amundsen search service + """ + + @abstractmethod + def fetch_search_results_with_field(self, *, + query_term: str, + field_name: str, + field_value: str, + page_index: int = 0) -> SearchResult: + pass + + @abstractmethod + def fetch_search_results(self, *, + query_term: str, + page_index: int = 0) -> SearchResult: + pass diff --git a/search_service/proxy/elasticsearch.py b/search_service/proxy/elasticsearch.py index 982929b7..9ce2e248 100644 --- a/search_service/proxy/elasticsearch.py +++ b/search_service/proxy/elasticsearch.py @@ -11,6 +11,7 @@ from search_service import config from search_service.models.search_result import SearchResult from search_service.models.table import Table +from search_service.proxy.base import BaseProxy from search_service.proxy.statsd_utilities import timer_with_counter # Default Elasticsearch index to use, if none specified @@ -22,7 +23,7 @@ LOGGING = logging.getLogger(__name__) -class ElasticsearchProxy: +class ElasticsearchProxy(BaseProxy): """ ElasticSearch connection handler """ diff --git a/setup.py b/setup.py index d1413fcd..0c233b62 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -__version__ = '1.0.0' +__version__ = '1.0.1' setup( diff --git a/tests/unit/proxy/test_atlas.py b/tests/unit/proxy/test_atlas.py new file mode 100644 index 00000000..f1cf80ec --- /dev/null +++ b/tests/unit/proxy/test_atlas.py @@ -0,0 +1,301 @@ +import string +import unittest + +from mock import MagicMock +from typing import List, Callable, Tuple + +from search_service import create_app +from search_service.models.search_result import SearchResult +from search_service.models.table import Table + + +class TestAtlasProxy(unittest.TestCase): + maxDiff = None + + def setUp(self): + self.app = create_app(config_module_class='search_service.config.LocalConfig') + self.app_context = self.app.app_context() + self.app_context.push() + self.qn = False + with self.app_context: + from search_service.proxy.atlas import AtlasProxy + self.proxy = AtlasProxy(host='DOES_NOT_MATTER:0000') + self.proxy.atlas = MagicMock() + self.qn = self.app.config['ATLAS_NAME_ATTRIBUTE'] == "qualifiedName" + self.entity_type = 'TEST_ENTITY' + self.cluster = 'TEST_CLUSTER' + self.db = 'TEST_DB' + self.name = 'TEST_TABLE' + self.table_uri = f'{self.entity_type}://{self.cluster}.{self.db}/{self.name}' + + self.classification_entity = { + 'classifications': [ + {'typeName': 'PII_DATA', 'name': 'PII_DATA'}, + ] + } + self.test_column = { + 'guid': 'DOESNT_MATTER', + 'typeName': 'COLUMN', + 'attributes': { + 'qualifiedName': f"{self.db}.Table1.column@{self.cluster}", + 'type': 'Managed', + 'description': 'column description', + 'position': 1 + } + + } + self.db_entity = { + 'guid': '-100', + 'typeName': self.entity_type, + 'attributes': { + 'qualifiedName': self.db + "@" + self.cluster, + 'name': self.db, + 'clusterName': self.cluster + } + } + self.entity1 = { + 'guid': '1', + 'typeName': self.entity_type, + 'classificationNames': [ + 'PII_DATA' + ], + 'relationshipAttributes': { + 'db': self.db_entity + }, + 'attributes': { + 'updateTime': 123, + 'name': 'Table1', + 'qualifiedName': f"{self.db}.Table1@{self.cluster}", + 'classifications': [ + {'typeName': 'PII_DATA'} + ], + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + 'columns': [self.test_column], + 'db': self.db_entity + } + } + self.entity1.update(self.classification_entity) + + self.entity2 = { + 'guid': '2', + 'typeName': self.entity_type, + 'classificationNames': [], + 'attributes': { + 'updateTime': 234, + 'qualifiedName': f"Table2", + 'name': 'Table2', + 'db': None, + 'description': 'Dummy Description', + 'owner': 'dummy@email.com', + } + } + self.entity2.update(self.classification_entity) + self.entities = { + 'entities': [ + self.entity1, + self.entity2, + ], + } + + def _qualified(self, kind, name, table=None): + if not self.qn: + return name + if kind == "db": + return f"{name}@{self.cluster}" + if kind == "column" and table: + return f"{self.db}.{table}.{name}@{self.cluster}" + if kind == "table": + return f"{self.db}.{name}@{self.cluster}" + return name + + @staticmethod + def recursive_mock(start: any): + """ + The atlas client allows retrieval of data via __getattr__. + That is why we build this method to recursively mock dictionary's to add + the __getattr__ and to convert them into MagicMock. + :param start: dictionary, list, or other + :return: MagicMock, list with mocked items, or other + """ + if isinstance(start, dict): + dict_mock = MagicMock() + dict_mock.__getitem__.side_effect = start.__getitem__ + dict_mock.__iter__.side_effect = start.__iter__ + dict_mock.__contains__.side_effect = start.__contains__ + dict_mock.get.side_effect = start.get + for key, value in start.items(): + value_mock = TestAtlasProxy.recursive_mock(value) + dict_mock.__setattr__(key, value_mock) + start[key] = value_mock + return dict_mock + elif isinstance(start, (list,)): + return list(map(TestAtlasProxy.recursive_mock, start)) + else: + return start + + @staticmethod + def dsl_inject(checks: List[Tuple[Callable[[str], bool], dict]]): + """ + helper method for returning results based on sql queries + :param checks: + :return: + """ + def search_dsl(query: string): + for check, data in checks: + if check(query): + response = MagicMock() + d = TestAtlasProxy.recursive_mock(data) + d.__iter__.return_value = [d] + d._data = { + 'queryType': "DSL", + 'queryText': query, + **data + } + response.__iter__.return_value = [d] + + return response + raise Exception(f"query not supported: {query}") + + return search_dsl + + @staticmethod + def bulk_inject(entities: dict): + """ + provide an entity_bulk method for atlas + :param entities: + :return: + """ + + def guid_filter(guid: List): + return TestAtlasProxy.recursive_mock([{ + 'entities': list(filter(lambda x: x['guid'] in guid, entities)) + }]) + + return guid_filter + + def test_search_normal(self): + expected = SearchResult(total_results=1, + results=[Table(name=self._qualified('table', 'Table1'), + key=f"TEST_ENTITY://TEST_CLUSTER.{self._qualified('db', 'TEST_DB')}/" + f"{self._qualified('table', 'Table1')}", + description='Dummy Description', + cluster='TEST_CLUSTER', + database='TEST_ENTITY', + schema_name=self._qualified('db', 'TEST_DB'), + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123), + Table(name='Table2', + key=f"TEST_ENTITY://./Table2", + description='Dummy Description', + cluster='', + database='TEST_ENTITY', + schema_name='', + column_names=[ + # 'column@name' + ], + tags=[], + last_updated_epoch=234), + ]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl and "Table" in dsl, + {"attributes": {"name": ["count()"], "values": [[2]]}}), + (lambda dsl: "Table" in dsl and any(x in dsl for x in ["select table", "from Table"]), + {'entities': [self.entity1, self.entity2]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table") + self.assertTrue(resp.total_results == 2, "there should be 2 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + self.assertDictEqual(vars(resp.results[1]), vars(expected.results[1]), + "Search Result doesn't match with expected result!") + + def test_search_empty(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!") + + def test_search_fields(self): + fields = ['tag', 'schema', 'table', 'column'] + for field in fields: + + expected = SearchResult(total_results=1, + results=[Table(name=self._qualified('table', 'Table1'), + key=f"TEST_ENTITY://TEST_CLUSTER.{self._qualified('db', 'TEST_DB')}/" + f"{self._qualified('table', 'Table1')}", + description='Dummy Description', + cluster='TEST_CLUSTER', + database='TEST_ENTITY', + schema_name=self._qualified('db', 'TEST_DB'), + column_names=[ + # 'column@name' + ], + tags=['PII_DATA'], + last_updated_epoch=123)]) + self.proxy.atlas.search_dsl = self.dsl_inject( + [ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[1]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table", "hive_column"]), + {'entities': [self.entity1]}) + ] + ) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.db_entity + ]) + resp = self.proxy.fetch_search_results_with_field( + query_term=field + "Table1", + field_name=field, + field_value="Table1" + ) + self.assertTrue(resp.total_results == 1, "there should be 1 search result") + self.assertIsInstance(resp.results[0], Table, "Search result received is not of 'Table' type!") + self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0]), + "Search Result doesn't match with expected result!") + + def test_unknown_field(self): + expected = SearchResult(total_results=0, + results=[]) + self.proxy.atlas.search_dsl = self.dsl_inject([ + (lambda dsl: "select count()" in dsl, + {"attributes": {"name": ["count()"], "values": [[0]]}}), + (lambda dsl: any(x in dsl for x in ["select table", "from Table"]), + {'entities': []}) + ]) + self.proxy.atlas.entity_bulk = self.bulk_inject([ + self.entity1, + self.entity2, + self.db_entity + ]) + resp = self.proxy.fetch_search_results(query_term="unknown:Table1") + self.assertTrue(resp.total_results == 0, "there should no search results") + self.assertIsInstance(resp, SearchResult, "Search result received is not of 'SearchResult' type!") + self.assertDictEqual(vars(resp), vars(expected), + "Search Result doesn't match with expected result!") From 732c9a3e8f67e4a1b7acafc22d385f330cbf3dcd Mon Sep 17 00:00:00 2001 From: Jin Hyuk Chang Date: Fri, 17 May 2019 15:16:22 -0700 Subject: [PATCH 11/13] [DPTOOLS-2252] Publish Docker image in CI (#26) --- .travis.yml | 10 +++++++++- Makefile | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index e7a84ea0..7db61981 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ script: after_success: - codecov deploy: - provider: pypi +- provider: pypi user: amundsen-dev password: secure: ZGfmHZUxxwWVfzKam4R0tzeb80SvopPkkCq1qQw5VIeKhVNHoE0CVyNa85Yo5xsvZSAYHOfGbCP29c6Y6c4+VlXmJQ+1majOFiwzM/kFau2dIaa965N5OXMU/AMtYlw68SpXo/jNlxNSDQwJtd8xEUyaHAS5fc5VvmXD05r4/2SvoodN9TrmkbwzZ8ujJTXG6U3vC3rR5fEIL5WDsdWrvk/Q6b6lvIDqJZ/RXYotauQeG1EXmyB4VroSjYO4+l9Zry+S46gYHWYEidQH1W1UrBRePXwqIH6VAH97PbsCFw6mvUeSkQYJHcSkB+wLUeN+HMcQOk6F3ke9NRkolx4LgD6N+AlXSz0wLta3IUgOAvbUZdxKnBKhjCftWoZx1SWYLtbqIJXEwC51dEQBUd/7GQJuIvXFnNDVz1yKB4zuYqx8v1a5D5uLT69152K14nTB6zu23eRvgPpXUlewAdfyqntOWwFBgK8KMeY54L3IglP+MiebP6smvhm0k5Fk1m7muqzmz3/1/if9UGXCnabG2Yml4MwutRR4PXuN8g9pOgRTQmHd3SV7Y8O9RIL+b8tbagwbpI3CLojO+9b52T7gqrV0d64I4g0WB2M7FsNZBhRuC9Wu7o8a+R9Ft5aK/wF2tRDd+VdurUbBlhhVgdJGWWxkmgk87kqGJjk25Hjm5Oo= @@ -18,3 +18,11 @@ deploy: tags: true distributions: sdist bdist_wheel repo: lyft/amundsensearchlibrary +- provider: script + script: docker login -u amundsendev -p $DOCKER_LOGIN_PASSWORD && make build-push-image + on: + branch: master + tags: true +env: + global: + secure: beSR8ZRLfnRGfO3xTZo24ETvkPMRmrejS8toR1TfCmcAUZRYrgo7qQ2WaNhCESqyVTY0VcGguamhb8kA8BZ9/WgnCXXqGUfoRhEcViqgpTY0oTnB0Jc0EN4JqTVDAGXio0JdejGQa2LiDfjGZSFCsQkg1GlmN+EzmhliA0VKKpeYJKJS4uCLCEULRkUG7pVvYCEPaVkLrVuj0Jv9nLhfPrgLsIxSQVySRklB6raooEWhFAgH2TMWL6XD/5Hb4kbtx7Diookmlz/lzTxxq8wdOlRAC9tysofkGYIw+KCndi2zVztBvmvewMYMaY4H+MbF5sI1ld7a0nc3xy0QWjamBOgreqalDay0Cj1I2NHcuHW4G4ynHR6uzo8GZInT8owQiKblWlgSLh+DvmYj6EaoIBzEBZz8ULw++0ICJhQjntj+c+qNiSvqK5GTsuCK6eMEPCuf/izDQzJQccV11N5f6qkXKn54XrjvHXHuc+Cq4Np9oObKzQeQlUYugWudWI1GJw5eKhcgSJoDYNNWbjXJhXWpBjyiXvhlwtvVIz7umqhd2B7JihSHR5HOY9Ublk1XAGCuhMXwo5HywFAIiaYoNu0BWsXZL93MaVL98+JkVTpino7IHi3TX0CcpU5Qbt4sHwoerHYbcqA/7XiItH27pV7vO6VH84nZIQ3lNk1SdG8= diff --git a/Makefile b/Makefile index d6d09b04..ef2b843a 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,6 @@ +IMAGE := amundsendev/amundsen-search +VERSION:= $(shell grep -m 1 '__version__' setup.py | cut -d '=' -f 2 | tr -d "'" | tr -d '[:space:]') + .PHONY: test clean: find . -name \*.pyc -delete @@ -18,3 +21,16 @@ mypy: .PHONY: test test: test_unit lint mypy + +.PHONY: image +image: + docker build -f public.Dockerfile -t ${IMAGE}:${VERSION} . + docker tag ${IMAGE}:${VERSION} ${IMAGE}:latest + +.PHONY: push-image +push-image: + docker push ${IMAGE}:${VERSION} + docker push ${IMAGE}:latest + +.PHONY: build-push-image +build-push-image: image push-image From 982a41a2a3d1e5099621260b739ba6ed013aaf25 Mon Sep 17 00:00:00 2001 From: Nanne Date: Mon, 20 May 2019 22:16:02 +0200 Subject: [PATCH 12/13] Fix #24, correct initialisation of elastic search (#27) * Fix #24, correct initialisation of elastic search * Update setup.py --- search_service/config.py | 6 +- search_service/proxy/__init__.py | 11 +++- search_service/proxy/atlas.py | 3 +- search_service/proxy/elasticsearch.py | 76 ++++------------------- setup.py | 2 +- tests/unit/proxy/test_elasticsearch.py | 25 +++++++- tests/unit/proxy/test_statsd_utilities.py | 2 +- 7 files changed, 51 insertions(+), 74 deletions(-) diff --git a/search_service/config.py b/search_service/config.py index 30d7e9da..014eec8a 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -1,10 +1,6 @@ import os -ELASTICSEARCH_ENDPOINT_KEY = 'ELASTICSEARCH_ENDPOINT' ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX' -ELASTICSEARCH_AUTH_USER_KEY = 'ELASTICSEARCH_AUTH_USER' -ELASTICSEARCH_AUTH_PW_KEY = 'ELASTICSEARCH_AUTH_PW' -ELASTICSEARCH_CLIENT_KEY = 'ELASTICSEARCH_CLIENT' SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE' STATS_FEATURE_KEY = 'STATS' @@ -12,6 +8,7 @@ PROXY_USER = 'PROXY_USER' PROXY_PASSWORD = 'PROXY_PASSWORD' PROXY_CLIENT = 'PROXY_CLIENT' +PROXY_CLIENT_KEY = 'PROXY_CLIENT_KEY' PROXY_CLIENTS = { 'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy', 'ATLAS': 'search_service.proxy.atlas.AtlasProxy' @@ -51,5 +48,6 @@ class LocalConfig(Config): PORT=PROXY_PORT) ) PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')] + PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic') PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic') diff --git a/search_service/proxy/__init__.py b/search_service/proxy/__init__.py index 933b6ea2..6ba0245b 100644 --- a/search_service/proxy/__init__.py +++ b/search_service/proxy/__init__.py @@ -9,6 +9,8 @@ _proxy_client = None _proxy_client_lock = Lock() +DEFAULT_PAGE_SIZE = 10 + def get_proxy_client() -> BaseProxy: """ @@ -24,12 +26,17 @@ def get_proxy_client() -> BaseProxy: if _proxy_client: return _proxy_client else: + obj = current_app.config[config.PROXY_CLIENT_KEY] + # Gather all the configuration to create a Proxy Client host = current_app.config[config.PROXY_ENDPOINT] user = current_app.config[config.PROXY_USER] password = current_app.config[config.PROXY_PASSWORD] - client = import_string(current_app.config[config.PROXY_CLIENT]) - _proxy_client = client(host=host, index=None, user=user, password=password) + + # number of results per search page + page_size = current_app.config.get(config.SEARCH_PAGE_SIZE_KEY, DEFAULT_PAGE_SIZE) + + _proxy_client = client(host=host, index=None, user=user, password=password, client=obj, page_size=page_size) return _proxy_client diff --git a/search_service/proxy/atlas.py b/search_service/proxy/atlas.py index 0f996162..c5f376fe 100644 --- a/search_service/proxy/atlas.py +++ b/search_service/proxy/atlas.py @@ -12,7 +12,6 @@ from search_service.proxy import BaseProxy from search_service.proxy.statsd_utilities import timer_with_counter -DEFAULT_PAGE_SIZE = 10 LOGGER = logging.getLogger(__name__) @@ -33,7 +32,7 @@ def __init__(self, *, index: str = None, user: str = '', password: str = '', - page_size: int = DEFAULT_PAGE_SIZE) -> None: + page_size: int = 10) -> None: self.atlas = Atlas(host, username=user, password=password) self.index = index self.page_size = page_size diff --git a/search_service/proxy/elasticsearch.py b/search_service/proxy/elasticsearch.py index 9ce2e248..ed2861c7 100644 --- a/search_service/proxy/elasticsearch.py +++ b/search_service/proxy/elasticsearch.py @@ -1,11 +1,8 @@ -from typing import List # noqa: F401 -from threading import Lock import logging import re from elasticsearch import Elasticsearch from elasticsearch_dsl import Search, query - from flask import current_app from search_service import config @@ -17,9 +14,6 @@ # Default Elasticsearch index to use, if none specified DEFAULT_ES_INDEX = 'table_search_index' -# default search page size -DEFAULT_PAGE_SIZE = 10 - LOGGING = logging.getLogger(__name__) @@ -29,11 +23,12 @@ class ElasticsearchProxy(BaseProxy): """ def __init__(self, *, host: str = None, + user: str = '', index: str = None, - auth_user: str = '', - auth_pw: str = '', - elasticsearch_client: Elasticsearch = None, - page_size: int = DEFAULT_PAGE_SIZE) -> None: + password: str = '', + client: Elasticsearch = None, + page_size: int = 10 + ) -> None: """ Constructs Elasticsearch client for interactions with the cluster. Allows caller to pass a fully constructed Elasticsearch client, {elasticsearch_client} @@ -46,21 +41,21 @@ def __init__(self, *, :param elasticsearch_client: Elasticsearch client to use, if provided :param page_size: Number of search results to return per request """ - if elasticsearch_client: - self.elasticsearch = elasticsearch_client + if client: + self.elasticsearch = client else: self.elasticsearch = self._create_client_from_credentials(host=host, - auth_user=auth_user, - auth_pw=auth_pw) + user=user, + password=password) - self.index = index + self.index = index or current_app.config.get(config.ELASTICSEARCH_INDEX_KEY, DEFAULT_ES_INDEX) self.page_size = page_size @staticmethod def _create_client_from_credentials(*, host: str = None, - auth_user: str = '', - auth_pw: str = '') -> Elasticsearch: + user: str = '', + password: str = '') -> Elasticsearch: """ Construct Elasticsearch client that connects to cluster at {host} and authenticates using {auth_user} and {auth_pw} @@ -68,7 +63,7 @@ def _create_client_from_credentials(*, the client :return: Elasticsearch client object """ - return Elasticsearch(host, http_auth=(auth_user, auth_pw)) + return Elasticsearch(host, http_auth=(user, password)) def _get_search_result(self, page_index: int, client: Search) -> SearchResult: @@ -243,48 +238,3 @@ def fetch_search_results(self, *, return self._search_helper(query_term=query_term, page_index=page_index, client=s) - - -_elasticsearch_proxy = None -_elasticsearch_lock = Lock() - - -def get_elasticsearch_proxy() -> ElasticsearchProxy: - """ - Fetch ElasticSearch proxy instance. Use a lock to create an instance - if one doesn't exist - :return: ElasticSearchProxy instance - """ - global _elasticsearch_proxy - - # elasticsearch cluster host to connect to - host = current_app.config.get(config.ELASTICSEARCH_ENDPOINT_KEY, None) - - # elasticsearch index - index = current_app.config.get(config.ELASTICSEARCH_INDEX_KEY, DEFAULT_ES_INDEX) - - # user name and password to connect to elasticsearch cluster - auth_user = current_app.config.get(config.ELASTICSEARCH_AUTH_USER_KEY, '') - auth_pw = current_app.config.get(config.ELASTICSEARCH_AUTH_PW_KEY, '') - - # fully constructed client object to use, if provided - elasticsearch_client = current_app.config.get(config.ELASTICSEARCH_CLIENT_KEY, None) - - # number of results per search page - page_size = current_app.config.get(config.SEARCH_PAGE_SIZE_KEY, DEFAULT_PAGE_SIZE) - - if _elasticsearch_proxy: - return _elasticsearch_proxy - - with _elasticsearch_lock: - if _elasticsearch_proxy: - return _elasticsearch_proxy - else: - _elasticsearch_proxy = ElasticsearchProxy(host=host, - index=index, - auth_user=auth_user, - auth_pw=auth_pw, - elasticsearch_client=elasticsearch_client, - page_size=page_size) - - return _elasticsearch_proxy diff --git a/setup.py b/setup.py index 0c233b62..0b40e83e 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -__version__ = '1.0.1' +__version__ = '1.0.2' setup( diff --git a/tests/unit/proxy/test_elasticsearch.py b/tests/unit/proxy/test_elasticsearch.py index 385b4cbe..b7f2ac9b 100644 --- a/tests/unit/proxy/test_elasticsearch.py +++ b/tests/unit/proxy/test_elasticsearch.py @@ -1,8 +1,10 @@ import unittest from unittest.mock import patch, MagicMock + from typing import Iterable from search_service import create_app +from search_service.proxy import get_proxy_client from search_service.proxy.elasticsearch import ElasticsearchProxy from search_service.models.search_result import SearchResult from search_service.models.table import Table @@ -38,7 +40,7 @@ def setUp(self) -> None: self.app_context.push() mock_elasticsearch_client = MagicMock() - self.es_proxy = ElasticsearchProxy(elasticsearch_client=mock_elasticsearch_client) + self.es_proxy = ElasticsearchProxy(client=mock_elasticsearch_client) self.mock_result1 = MockSearchResult(table_name='test_table', table_key='test_key', @@ -70,6 +72,27 @@ def setUp(self) -> None: tags=['match'], last_updated_epoch=1527283287) + def test_setup_client(self) -> None: + self.es_proxy = ElasticsearchProxy( + host="http://0.0.0.0:9200", + index="random123", + user="elastic", + password="elastic" + ) + a = self.es_proxy.elasticsearch + for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]: + self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0") + self.assertEqual(client.transport.hosts[0]['port'], 9200) + self.assertEqual(self.es_proxy.index, "random123") + + def test_setup_config(self) -> None: + es: ElasticsearchProxy = get_proxy_client() + a = es.elasticsearch + for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]: + self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0") + self.assertEqual(client.transport.hosts[0]['port'], 9200) + self.assertEqual(self.es_proxy.index, "table_search_index") + @patch('elasticsearch_dsl.Search.execute') def test_search_with_empty_query_string(self, mock_search: MagicMock) -> None: diff --git a/tests/unit/proxy/test_statsd_utilities.py b/tests/unit/proxy/test_statsd_utilities.py index dce818fa..17bb7c42 100644 --- a/tests/unit/proxy/test_statsd_utilities.py +++ b/tests/unit/proxy/test_statsd_utilities.py @@ -60,7 +60,7 @@ def test_with_elasticsearch_proxy(self, mock_search: MagicMock) -> None: mock_elasticsearch_client = MagicMock() - es_proxy = ElasticsearchProxy(elasticsearch_client=mock_elasticsearch_client) + es_proxy = ElasticsearchProxy(client=mock_elasticsearch_client) with patch.object(statsd_utilities, '_get_statsd_client') as mock_statsd_client: mock_success_incr = MagicMock() From 6ffc57032ef83197b3d642ccad02b31a633c0d18 Mon Sep 17 00:00:00 2001 From: Jakub Hettler Date: Thu, 20 Jun 2019 13:41:11 +0200 Subject: [PATCH 13/13] Add metrics/dashboards --- search_service/api/search.py | 47 ++++++++++++++- search_service/config.py | 2 +- search_service/models/dashboard.py | 31 ++++++++++ search_service/models/metric.py | 31 ++++++++++ search_service/models/search_result.py | 5 +- search_service/proxy/elasticsearch.py | 79 +++++++++++++++++++++----- search_service/search_wsgi.py | 2 +- 7 files changed, 178 insertions(+), 19 deletions(-) create mode 100644 search_service/models/dashboard.py create mode 100644 search_service/models/metric.py diff --git a/search_service/api/search.py b/search_service/api/search.py index 47928cca..9d46de79 100644 --- a/search_service/api/search.py +++ b/search_service/api/search.py @@ -1,6 +1,6 @@ from typing import Iterable, Any -from flask_restful import Resource, fields, marshal_with, reqparse +from flask_restful import Resource, fields, marshal_with, reqparse, marshal from search_service.proxy import get_proxy_client @@ -19,9 +19,52 @@ "last_updated_epoch": fields.Integer, } +dashboard_fields = { + "dashboard_group": fields.String, + "dashboard_name": fields.String, + # description can be empty, if no description is present in DB + "description": fields.String, + "last_reload_time": fields.String, + "user_id": fields.String, + "user_name": fields.String, + "tags": fields.List(fields.String) +} + +metric_fields = { + "dashboard_group": fields.String, + "dashboard_name": fields.String, + "metric_name": fields.String, + "metric_function": fields.String, + # description can be empty, if no description is present in DB + "metric_description": fields.String, + "metric_type": fields.String, + "metric_group": fields.String +} + +table_result_fields = { + "result_count": fields.Integer, + "results": fields.List(fields.Nested(table_fields), default=[]) +} + +dashboard_result_fields = { + "result_count": fields.Integer, + "results": fields.List(fields.Nested(dashboard_fields), default=[]) +} + +metric_result_fields = { + "result_count": fields.Integer, + "results": fields.List(fields.Nested(metric_fields), default=[]) +} + +result_fields = { + "dashboards": fields.Nested(dashboard_result_fields), + "tables": fields.Nested(table_result_fields), + "metrics": fields.Nested(metric_result_fields), +} + search_results = { "total_results": fields.Integer, - "results": fields.Nested(table_fields, default=[]) + "results": fields.Nested(result_fields) } diff --git a/search_service/config.py b/search_service/config.py index 014eec8a..34951c23 100644 --- a/search_service/config.py +++ b/search_service/config.py @@ -33,7 +33,7 @@ class Config: ATLAS_NAME_ATTRIBUTE = 'qualifiedName' # Config used by ElastichSearch - ELASTICSEARCH_INDEX = 'table_search_index' + ELASTICSEARCH_INDEX = '_all' class LocalConfig(Config): diff --git a/search_service/models/dashboard.py b/search_service/models/dashboard.py new file mode 100644 index 00000000..743d4811 --- /dev/null +++ b/search_service/models/dashboard.py @@ -0,0 +1,31 @@ +from typing import Iterable + + +class Dashboard: + def __init__(self, *, + dashboard_group: str, + dashboard_name: str, + description: str, + last_reload_time: list, + user_id: str, + user_name: str, + tags: str) -> None: + self.dashboard_group = dashboard_group + self.dashboard_name = dashboard_name + self.description = description + self.last_reload_time = last_reload_time + self.user_id = user_id + self.user_name = user_name + self.tags = tags + + def __repr__(self) -> str: + return 'Dashboard(dashboard_group={!r}, dashboard_name={!r}, ' \ + 'description={!r}, last_reload_time={!r}, user_id={!r},' \ + 'user_name={!r}, tags={!r})' \ + .format(self.dashboard_group, + self.dashboard_name, + self.description, + self.last_reload_time, + self.user_id, + self.user_name, + self.tags) diff --git a/search_service/models/metric.py b/search_service/models/metric.py new file mode 100644 index 00000000..541338ab --- /dev/null +++ b/search_service/models/metric.py @@ -0,0 +1,31 @@ +from typing import Iterable + + +class Metric: + def __init__(self, *, + dashboard_group: str, + dashboard_name: str, + metric_name: str, + metric_function: list, + metric_description: str, + metric_type: str, + metric_group: str) -> None: + self.dashboard_group = dashboard_group + self.dashboard_name = dashboard_name + self.metric_name = metric_name + self.metric_function = metric_function + self.metric_description = metric_description + self.metric_type = metric_type + self.metric_group = metric_group + + def __repr__(self) -> str: + return 'Metric(dashboard_group={!r}, dashboard_name={!r}, ' \ + 'metric_name={!r}, metric_function={!r}, metric_description={!r},' \ + 'metric_type={!r}, metric_group={!r})' \ + .format(self.dashboard_group, + self.dashboard_name, + self.metric_name, + self.metric_function, + self.metric_description, + self.metric_type, + self.metric_group) diff --git a/search_service/models/search_result.py b/search_service/models/search_result.py index 901c9268..e50dda16 100644 --- a/search_service/models/search_result.py +++ b/search_service/models/search_result.py @@ -1,11 +1,12 @@ -from typing import List +from typing import List, Dict from search_service.models.table import Table +from search_service.models.dashboard import Dashboard class SearchResult: def __init__(self, *, total_results: int, - results: List[Table]) -> None: + results: Dict) -> None: self.total_results = total_results self.results = results diff --git a/search_service/proxy/elasticsearch.py b/search_service/proxy/elasticsearch.py index ed2861c7..98f039a2 100644 --- a/search_service/proxy/elasticsearch.py +++ b/search_service/proxy/elasticsearch.py @@ -8,6 +8,8 @@ from search_service import config from search_service.models.search_result import SearchResult from search_service.models.table import Table +from search_service.models.dashboard import Dashboard +from search_service.models.metric import Metric from search_service.proxy.base import BaseProxy from search_service.proxy.statsd_utilities import timer_with_counter @@ -74,29 +76,70 @@ def _get_search_result(self, page_index: int, :param client :return: """ + dashboard_results = [] table_results = [] + metric_results = [] # Use {page_index} to calculate index of results to fetch from start_from = page_index * self.page_size end_at = start_from + self.page_size client = client[start_from:end_at] response = client.execute() - for hit in response: + table_count = 0 + dashboard_count = 0 + metric_count = 0 - table = Table(name=hit.table_name, - key=hit.table_key, - description=hit.table_description, - cluster=hit.cluster, - database=hit.database, - schema_name=hit.schema_name, - column_names=hit.column_names, - tags=hit.tag_names, - last_updated_epoch=hit.table_last_updated_epoch) + for hit in response: - table_results.append(table) + if hit.meta.doc_type == 'table': + table_count += 1 + + table = Table(name=hit.table_name, + key=hit.table_key, + description=hit.table_description, + cluster=hit.cluster, + database=hit.database, + schema_name=hit.schema_name, + column_names=hit.column_names, + tags=hit.tag_names, + last_updated_epoch=hit.table_last_updated_epoch) + + table_results.append(table) + + elif hit.meta.doc_type == 'dashboard': + dashboard_count += 1 + + dashboard = Dashboard(dashboard_group=hit.dashboard_group, + dashboard_name=hit.dashboard_name, + description=hit.description, + last_reload_time=hit.last_reload_time, + user_id=hit.user_id, + user_name=hit.user_name, + tags=hit.tags) + + dashboard_results.append(dashboard) + + elif hit.meta.doc_type == 'metric': + metric_count += 1 + + metric = Metric(dashboard_group=hit.dashboard_group, + dashboard_name=hit.dashboard_name, + metric_name=hit.metric_name, + metric_function=hit.metric_function, + metric_description=hit.metric_description, + metric_type=hit.metric_type, + metric_group=hit.metric_group) + + metric_results.append(metric) + + results = { + "dashboards": {"result_count": dashboard_count, "results": dashboard_results}, + "tables": {"result_count": table_count, "results": table_results}, + "metrics": {"result_count": metric_count, "results": metric_results} + } return SearchResult(total_results=response.hits.total, - results=table_results) + results=results) def _search_helper(self, query_term: str, page_index: int, @@ -134,6 +177,16 @@ def _search_helper(self, query_term: str, } } } + + d = { + "function_score": { + "query": { + "multi_match": { + "query": query_term + } + } + } + } q = query.Q(d) client = client.query(q) @@ -206,7 +259,7 @@ def fetch_search_results_with_field(self, *, field_name = self._field_name_transform(field_name=field_name) # We allow user to use ? * for wildcard support - m = re.search('[\?\*]', field_value) + m = re.search(r'[\?\*]', field_value) if m: return self._search_wildcard_helper(field_value=field_value, page_index=page_index, diff --git a/search_service/search_wsgi.py b/search_service/search_wsgi.py index 8132e007..2a433b67 100644 --- a/search_service/search_wsgi.py +++ b/search_service/search_wsgi.py @@ -12,4 +12,4 @@ application = create_app(config_module_class=config_module_class) if __name__ == "__main__": - application.run(host='0.0.0.0') + application.run(host='0.0.0.0', port=5001)