Skip to content

Commit

Permalink
Integrates Atlas DSL Search (#17)
Browse files Browse the repository at this point in the history
* Add base proxy behind elastic search to support other search engines

* Add support for Apache Atlas DSL search

* Add new proxy with the Apache Atlas as backend

* Add document to describing Apache Atlas search methods

* Update setup.py
  • Loading branch information
whazor authored and feng-tao committed May 14, 2019
1 parent 86ea652 commit 1a05eb2
Show file tree
Hide file tree
Showing 11 changed files with 652 additions and 16 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Proxy package contains proxy modules that talks dependencies of Search service.
##### [Elasticsearch proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/elasticsearch.py "Elasticsearch proxy module")
[Elasticsearch](https://www.elastic.co/products/elasticsearch "Elasticsearch") proxy module serves various use case of searching metadata from Elasticsearch. It uses [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html "Query DSL") for the use case, execute the search query and transform into [model](https://github.com/lyft/amundsensearchlibrary/tree/master/search_service/models "model").

##### [Atlas proxy module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/atlas.py "Atlas proxy module")
[Apache Atlas](https://atlas.apache.org/ "Apache Atlas") proxy module uses Atlas to serve the Atlas requests. At the moment the search DSL REST api is used via the [Python Client](https://atlasclient.readthedocs.io/ "Atlas Client").


##### [Statsd utilities module](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/proxy/statsd_utilities.py "Statsd utilities module")
[Statsd](https://github.com/etsy/statsd/wiki "Statsd") utilities module has methods / functions to support statsd to publish metrics. By default, statsd integration is disabled and you can turn in on from [Search service configuration](https://github.com/lyft/amundsensearchlibrary/blob/master/search_service/config.py#L7 "Search service configuration").
For specific configuration related to statsd, you can configure it through [environment variable.](https://statsd.readthedocs.io/en/latest/configure.html#from-the-environment "environment variable.")
Expand Down
35 changes: 35 additions & 0 deletions docs/atlas-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Atlas search investigation
There are several approaches to integrate searching within [Apache Atlas](https://atlas.apache.org/ "Apache Atlas"), we describe multiple options below:

- Use REST API's

Directly using the Atlas API's is quick to implement and easy to setup for administrators. Atlas uses a search engine
underwater (embedded Solr) to perform search queries, thus in theory this method should scale up. Disadvantages are that
we are limited to the REST api that Atlas offers, we could potentially add functionality via pull requests and extend
the search capabilities. The [advanced search](https://atlas.apache.org/Search-Advanced.html "Apache Atlas Advanced Search")
provides a DSL which contains basic forms of aggregation and arithmetic.

- Use Data Builder to fill Elasticsearch from Atlas

Adopting Atlas within the Data Builder to fill Elasticsearch is a relatively straightforward way of staying
compatible with the Neo4j database. It could either be pulling data from Atlas or being pushed by Kafka. This method
requires a setup of Elasticsearch and Airflow, which increases the amount of infrastructure and maintenance.
Another disadvantage is that with a big inflow of metadata this method might not scale as well as the other methods.

- Use underlying Solr or Elasticsearch from Apache Atlas

Within Atlas there is the possibility to open up either Solr or the experimental Elasticsearch. It depends on janusgraph
(the behind the scenes graph database) which populates the search engine. Therefore the search engine would not be compatible with
the data builder setup. Adoption of such a search engine would require either new queries, some kind of transformer
within the search engine, or changes within Atlas itself.

## Discussion
Both the REST API approach and the data builder approach can be implemented and be configurable. Both approaches have
their own benefits, the data builder together provides a more fine-tuned search whereas the Atlas REST API comes out
of the box with Atlas. The last approach of using the underlying search engine from Atlas provides direct access
to all the meta data with a decent search API. However, integration would be less straight forward as the indexes would
differ from the data builders search engine loader.


The focus is initially to implement the REST API approach and afterwards potentially implement an Atlas data extractor
and importer within the Amundsen Data Builder. So that administrators have more flexibility in combining data sources.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ typing==3.6.4
Werkzeug==0.14.1
wheel==0.31.1
mypy==0.660
atlasclient==0.1.6
10 changes: 5 additions & 5 deletions search_service/api/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from flask_restful import Resource, fields, marshal_with, reqparse

from search_service.proxy import elasticsearch
from search_service.proxy import get_proxy_client

table_fields = {
"name": fields.String,
Expand Down Expand Up @@ -30,7 +30,7 @@ class SearchAPI(Resource):
Search API
"""
def __init__(self) -> None:
self.elasticsearch = elasticsearch.get_elasticsearch_proxy()
self.proxy = get_proxy_client()

self.parser = reqparse.RequestParser(bundle_errors=True)

Expand All @@ -50,7 +50,7 @@ def get(self) -> Iterable[Any]:

try:

results = self.elasticsearch.fetch_search_results(
results = self.proxy.fetch_search_results(
query_term=args['query_term'],
page_index=args['page_index']
)
Expand All @@ -68,7 +68,7 @@ class SearchFieldAPI(Resource):
Search API with explict field
"""
def __init__(self) -> None:
self.elasticsearch = elasticsearch.get_elasticsearch_proxy()
self.proxy = get_proxy_client()

self.parser = reqparse.RequestParser(bundle_errors=True)

Expand All @@ -91,7 +91,7 @@ def get(self, *, field_name: str,
args = self.parser.parse_args(strict=True)

try:
results = self.elasticsearch.fetch_search_results_with_field(
results = self.proxy.fetch_search_results_with_field(
query_term=args.get('query_term'),
field_name=field_name,
field_value=field_value,
Expand Down
41 changes: 32 additions & 9 deletions search_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,48 @@
SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE'
STATS_FEATURE_KEY = 'STATS'

PROXY_ENDPOINT = 'PROXY_ENDPOINT'
PROXY_USER = 'PROXY_USER'
PROXY_PASSWORD = 'PROXY_PASSWORD'
PROXY_CLIENT = 'PROXY_CLIENT'
PROXY_CLIENTS = {
'ELASTICSEARCH': 'search_service.proxy.elasticsearch.ElasticsearchProxy',
'ATLAS': 'search_service.proxy.atlas.AtlasProxy'
}


class Config:
LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(levelname)s] %(module)s.%(funcName)s:%(lineno)d (%(process)d:'\
'%(threadName)s) - %(message)s'
LOG_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
LOG_LEVEL = 'INFO'

# Used to differentiate tables with other entities in Atlas. For more details:
# https://github.com/lyft/amundsenmetadatalibrary/blob/master/docs/proxy/atlas_proxy.md
ATLAS_TABLE_ENTITY = 'Table'

# The relationalAttribute name of Atlas Entity that identifies the database entity.
ATLAS_DB_ATTRIBUTE = 'db'

# Display name of Atlas Entities that we use for amundsen project.
# Atlas uses qualifiedName as indexed attribute. but also supports 'name' attribute.
ATLAS_NAME_ATTRIBUTE = 'qualifiedName'

# Config used by ElastichSearch
ELASTICSEARCH_INDEX = 'table_search_index'


class LocalConfig(Config):
DEBUG = False
TESTING = False
STATS = False
LOCAL_HOST = '0.0.0.0'
ELASTICSEARCH_PORT = '9200'
ELASTICSEARCH_ENDPOINT = os.environ.get('ELASTICSEARCH_ENDPOINT',
'http://{LOCAL_HOST}:{PORT}'.format(
LOCAL_HOST=LOCAL_HOST,
PORT=ELASTICSEARCH_PORT)
)
ELASTICSEARCH_INDEX = 'table_search_index'
ELASTICSEARCH_AUTH_USER = 'elastic'
ELASTICSEARCH_AUTH_PW = 'elastic'
PROXY_PORT = '9200'
PROXY_ENDPOINT = os.environ.get('PROXY_ENDPOINT',
'http://{LOCAL_HOST}:{PORT}'.format(
LOCAL_HOST=LOCAL_HOST,
PORT=PROXY_PORT)
)
PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')]
PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic')
PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic')
35 changes: 35 additions & 0 deletions search_service/proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from threading import Lock

from flask import current_app

from search_service import config
from search_service.proxy.base import BaseProxy
from werkzeug.utils import import_string

_proxy_client = None
_proxy_client_lock = Lock()


def get_proxy_client() -> BaseProxy:
"""
Provides singleton proxy client based on the config
:return: Proxy instance of any subclass of BaseProxy
"""
global _proxy_client

if _proxy_client:
return _proxy_client

with _proxy_client_lock:
if _proxy_client:
return _proxy_client
else:
# Gather all the configuration to create a Proxy Client
host = current_app.config[config.PROXY_ENDPOINT]
user = current_app.config[config.PROXY_USER]
password = current_app.config[config.PROXY_PASSWORD]

client = import_string(current_app.config[config.PROXY_CLIENT])
_proxy_client = client(host=host, index=None, user=user, password=password)

return _proxy_client
Loading

0 comments on commit 1a05eb2

Please sign in to comment.