From 57025e74e1f943457c1011923fa8c6750c2d0a7c Mon Sep 17 00:00:00 2001 From: Isaac Prior Date: Thu, 21 Oct 2021 17:56:29 +0100 Subject: [PATCH] Improves support for Elasticsearch storage. Requires Elasticsearch 7. Provides an index template to manage cloudkitty document mappings with the option to include user provided component templates. Uses dated indices and an index alias to allow for better management of data. Change-Id: Ie7301ebccc3a72876abcb5c4cc8c07ceaaa29e29 --- .../storage/v2/elasticsearch/__init__.py | 73 +++++++--- cloudkitty/storage/v2/elasticsearch/client.py | 121 +++++++++++++++- .../storage/v2/elasticsearch/exceptions.py | 8 ++ .../storage/v2/elasticsearch/test_client.py | 132 ++++++++++++++++++ cloudkitty/tests/storage/v2/es_utils.py | 5 +- devstack/plugin.sh | 44 ++++-- doc/source/_static/cloudkitty.conf.sample | 8 ++ ...-index-alias-support-2bcdacff9ee91524.yaml | 17 +++ 8 files changed, 371 insertions(+), 37 deletions(-) create mode 100644 releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml diff --git a/cloudkitty/storage/v2/elasticsearch/__init__.py b/cloudkitty/storage/v2/elasticsearch/__init__.py index 058e9126..b6673873 100644 --- a/cloudkitty/storage/v2/elasticsearch/__init__.py +++ b/cloudkitty/storage/v2/elasticsearch/__init__.py @@ -37,21 +37,35 @@ default='http://localhost:9200'), cfg.StrOpt( 'index_name', - help='Elasticsearch index to use. Defaults to "cloudkitty".', + help='Elasticsearch index to use. ' + 'Defaults to "cloudkitty".', default='cloudkitty'), - cfg.BoolOpt('insecure', - help='Set to true to allow insecure HTTPS ' - 'connections to Elasticsearch', - default=False), - cfg.StrOpt('cafile', - help='Path of the CA certificate to trust for ' - 'HTTPS connections.', - default=None), - cfg.IntOpt('scroll_duration', - help="Duration (in seconds) for which the ES scroll contexts " - "should be kept alive.", - advanced=True, - default=30, min=0, max=300), + cfg.StrOpt( + 'template_name', + help='Elasticsearch template name to use. ' + 'Defaults to "cloudkitty_mapping".', + default='cloudkitty_mapping'), + cfg.ListOpt( + 'component_templates', + help='List of Elasticsearch component template ' + 'names to include in the index template. ', + default=[]), + cfg.BoolOpt( + 'insecure', + help='Set to true to allow insecure HTTPS ' + 'connections to Elasticsearch', + default=False), + cfg.StrOpt( + 'cafile', + help='Path of the CA certificate to trust for ' + 'HTTPS connections.', + default=None), + cfg.IntOpt( + 'scroll_duration', + help="Duration (in seconds) for which the ES scroll contexts " + "should be kept alive.", + advanced=True, + default=30, min=0, max=300) ] CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP) @@ -100,14 +114,31 @@ def __init__(self, *args, **kwargs): verify=verify) def init(self): - r = self._conn.get_index() - if r.status_code != 200: - raise exceptions.IndexDoesNotExist( - CONF.storage_elasticsearch.index_name) - LOG.info('Creating mapping "_doc" on index {}...'.format( + LOG.info('Creating index template for mapping.') + index_pattern = "{}-*".format(CONF.storage_elasticsearch.index_name) + component_templates = CONF.storage_elasticsearch.component_templates + index_template = self._conn.build_index_template( + index_pattern, component_templates, CLOUDKITTY_INDEX_MAPPING) + self._conn.put_index_template( + CONF.storage_elasticsearch.template_name, index_template) + LOG.info('Index template for mapping created.') + + # If index_name exists, test to ensure it is an alias + if self._conn.exists_index(): + if not self._conn.is_index_alias(): + raise exceptions.IndexAliasAlreadyExists( + CONF.storage_elasticsearch.index_name) + LOG.info('Index alias already exists. Skipping creation.') + + # Otherwise create a dated index with index_name as an alias + else: + LOG.info('Creating first index.') + self._conn.put_first_index() + + # Rollover index on startup + LOG.info('Rolling over index {}'.format( CONF.storage_elasticsearch.index_name)) - self._conn.put_mapping(CLOUDKITTY_INDEX_MAPPING) - LOG.info('Mapping created.') + self._conn.post_index_rollover() def push(self, dataframes, scope_id=None): for frame in dataframes: diff --git a/cloudkitty/storage/v2/elasticsearch/client.py b/cloudkitty/storage/v2/elasticsearch/client.py index 79651b8c..6b3e78a9 100644 --- a/cloudkitty/storage/v2/elasticsearch/client.py +++ b/cloudkitty/storage/v2/elasticsearch/client.py @@ -92,7 +92,8 @@ def _build_should(filters): {'term': {'metadata.' + k: v}}] return should - def _build_composite(self, groupby): + @staticmethod + def _build_composite(groupby): if not groupby: return [] sources = [] @@ -147,11 +148,121 @@ def _req(self, method, url, data, params, deserialize=True): self._log_query(url, data, output) return output + def _req_unsafe(self, method, url, data, params): + """Request without exception on invalid HTTP codes.""" + return method(url, data=data, params=params) + + def _req_exists(self, url, data, params): + r = self._sess.head(url, data=data, params=params) + if r.status_code == 200: + return True + elif r.status_code == 404: + return False + else: + raise exceptions.InvalidStatusCode( + "200/404", r.status_code, r.text, None) + + @staticmethod + def build_index_template(index_pattern, component_templates, mapping): + """Build an index template for mapping.""" + # High priority template to avoid being overridden. + template_priority = 500 + return { + "index_patterns": [index_pattern], + "priority": template_priority, + "composed_of": component_templates, + "template": { + "mappings": mapping + } + } + + def put_index_template(self, template_name, template): + """Does a PUT request against the ES template API. + + Does a PUT request against `/_template/` + if es6 or `/_index_template/` if es7. + + :param template_name: index template name + :type template_name: string + :param template: index template + :type template: dict + :rtype: requests.models.Response + """ + url = '/'.join( + (self._url, '_index_template', template_name)) + data = json.dumps(template) + LOG.debug('Creating index template {} with data:\n{}'.format( + template_name, data)) + return self._req( + self._sess.put, url, data, None, deserialize=False) + + def put_first_index(self): + """Does a PUT request against the ES index API. + + Does a PUT request against `/-{now/d}-000001`. + + Creates a dated index with an alias for which it is the write index. + + :rtype: requests.models.Response + """ + # Percent encode the / (%2F) in the date math for the index name + index_string = "<{}{}>".format(self._index_name, "-{now%2Fd}-000001") + url = '/'.join((self._url, index_string)) + aliases = { + "aliases": { + self._index_name: { + "is_write_index": True + } + } + } + LOG.debug('Creating index {} with data:\n{}'.format( + index_string, json.dumps(aliases))) + return self._req( + self._sess.put, url, json.dumps(aliases), None, deserialize=False) + + def post_index_rollover(self): + """Does a POST request against the ES index API. + + Does a POST request against `//_rollover`. + + Performs a rollover of the index alias. + + :rtype: requests.models.Response + """ + url = '/'.join((self._url, self._index_name, '_rollover')) + self._req(self._sess.post, url, None, None, deserialize=False) + + def exists_index(self): + """Does a HEAD request against the ES index API. + + Does a HEAD request against `/`. + + Tests if an index or index alias exists. + + :rtype: Boolean + """ + url = '/'.join((self._url, self._index_name)) + param = {"allow_no_indices": "false"} + return self._req_exists(url, None, param) + + def is_index_alias(self): + """Does a HEAD request against the ES alias API. + + Does a HEAD request against `/_alias/`. + + Tests if an index alias exists. + + :rtype: Boolean + """ + url = '/'.join((self._url, '_alias', self._index_name)) + return self._req_exists(url, None, None) + def put_mapping(self, mapping): """Does a PUT request against ES's mapping API. - The PUT request will be done against - `//_mapping/` + Does a PUT request against `//_mapping/` + + Creates or updates an index mapping. :mapping: body of the request :type mapping: dict @@ -168,7 +279,7 @@ def put_mapping(self, mapping): def get_index(self): """Does a GET request against ES's index API. - The GET request will be done against `/` + Does a GET request against `/` :rtype: requests.models.Response """ @@ -360,7 +471,7 @@ def total(self, begin, end, metric_types, filters, groupby, must = self._build_must(begin, end, metric_types, filters) should = self._build_should(filters) - composite = self._build_composite(groupby) if groupby else None + composite = self._build_composite(groupby) if composite: composite['size'] = self._chunk_size query = self._build_query(must, should, composite) diff --git a/cloudkitty/storage/v2/elasticsearch/exceptions.py b/cloudkitty/storage/v2/elasticsearch/exceptions.py index 201d7f9d..7497e67d 100644 --- a/cloudkitty/storage/v2/elasticsearch/exceptions.py +++ b/cloudkitty/storage/v2/elasticsearch/exceptions.py @@ -30,3 +30,11 @@ def __init__(self, index_name): super(IndexDoesNotExist, self).__init__( "Elasticsearch index {} does not exist".format(index_name) ) + + +class IndexAliasAlreadyExists(BaseElasticsearchException): + def __init__(self, index_name): + super(IndexAliasAlreadyExists, self).__init__( + "Elasticsearch index alias {} already exists as an index".format( + index_name) + ) diff --git a/cloudkitty/tests/storage/v2/elasticsearch/test_client.py b/cloudkitty/tests/storage/v2/elasticsearch/test_client.py index 8948be91..2feb7c17 100644 --- a/cloudkitty/tests/storage/v2/elasticsearch/test_client.py +++ b/cloudkitty/tests/storage/v2/elasticsearch/test_client.py @@ -20,6 +20,7 @@ from dateutil import tz from cloudkitty import dataframe +import cloudkitty.storage.v2.elasticsearch from cloudkitty.storage.v2.elasticsearch import client from cloudkitty.storage.v2.elasticsearch import exceptions @@ -181,6 +182,137 @@ def test_req_invalid_status_code(self): self.client._req, method_mock, None, None, None) + def test_req_unsafe(self): + url = '/endpoint' + data = {'1': 'one'} + params = {'v'} + resp_mock = mock.MagicMock() + resp_mock.status_code = 400 + method_mock = mock.MagicMock() + method_mock.return_value = resp_mock + req_resp = self.client._req_unsafe( + method_mock, url, data, params) + method_mock.assert_called_once_with( + url, data=data, params=params) + self.assertEqual(req_resp, resp_mock) + + def test_req_exists(self): + url = '/endpoint' + data = {'1': 'one'} + params = {'v'} + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.client._req_exists( + url, data=data, params=params) + hmock.assert_called_once_with( + url, data=data, params=params) + + def test_req_exists_true(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertTrue(self.client._req_exists( + url, data=None, params=None)) + + def test_req_exists_false(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 404 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertFalse(self.client._req_exists( + url, data=None, params=None)) + + def test_req_exists_exception(self): + url = '/endpoint' + resp_mock = mock.MagicMock() + resp_mock.status_code = 418 # I'm a teapot + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + self.assertRaises(exceptions.InvalidStatusCode, + self.client._req_exists, + url, data=None, params=None) + + def test_build_index_template(self): + index_pattern = "cloudkitty-*" + mapping = cloudkitty.storage.v2.elasticsearch.CLOUDKITTY_INDEX_MAPPING + component_templates = ["cloudkitty_settings"] + expected = { + "index_patterns": ["cloudkitty-*"], + "priority": 500, + "composed_of": component_templates, + "template": { + "mappings": mapping + } + } + self.assertEqual( + self.client.build_index_template( + index_pattern, component_templates, mapping), expected) + + def test_put_index_template(self): + template_name = 'test_template' + template = { + "index_patterns": ["index_name-*"], + "priority": 500, + "template": { + "mappings": "fake_mapping" + } + } + expected_data = \ + ('{"index_patterns": ["index_name-*"], "priority": 500, ' + '"template": {"mappings": "fake_mapping"}}') + with mock.patch.object(self.client, '_req') as rmock: + self.client.put_index_template( + template_name, template) + rmock.assert_called_once_with( + self.client._sess.put, + 'http://elasticsearch:9200/_index_template/test_template', + expected_data, None, deserialize=False) + + def test_put_first_index(self): + expected_data = '{"aliases": {"index_name": {"is_write_index": true}}}' + with mock.patch.object(self.client, '_req') as rmock: + self.client.put_first_index() + rmock.assert_called_once_with( + self.client._sess.put, + 'http://elasticsearch:9200/', + expected_data, None, deserialize=False) + + def test_post_index_rollover(self): + with mock.patch.object(self.client, '_req') as rmock: + self.client.post_index_rollover() + rmock.assert_called_once_with( + self.client._sess.post, + 'http://elasticsearch:9200/index_name/_rollover', + None, None, deserialize=False) + + def test_exists_index(self): + expected_param = {"allow_no_indices": "false"} + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + r = self.client.exists_index() + hmock.assert_called_once_with( + 'http://elasticsearch:9200/index_name', + data=None, params=expected_param) + self.assertTrue(r) + + def test_is_index_alias(self): + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + with mock.patch.object(self.client._sess, 'head') as hmock: + hmock.return_value = resp_mock + r = self.client.is_index_alias() + hmock.assert_called_once_with( + 'http://elasticsearch:9200/_alias/index_name', + data=None, params=None) + self.assertTrue(r) + def test_put_mapping(self): mapping = {'a': 'b'} with mock.patch.object(self.client, '_req') as rmock: diff --git a/cloudkitty/tests/storage/v2/es_utils.py b/cloudkitty/tests/storage/v2/es_utils.py index 40e27c38..1d806453 100644 --- a/cloudkitty/tests/storage/v2/es_utils.py +++ b/cloudkitty/tests/storage/v2/es_utils.py @@ -26,7 +26,10 @@ class FakeElasticsearchClient(client.ElasticsearchClient): def __init__(self, *args, **kwargs): kwargs["autocommit"] = False super(FakeElasticsearchClient, self).__init__(*args, **kwargs) - for method in ('get_index', 'put_mapping'): + for method in ('exists_index', + 'is_index_alias', + 'get_index', + 'put_mapping'): setattr(self, method, self.__base_response) @staticmethod diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 248ec857..87b62c24 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -242,12 +242,6 @@ function create_influxdb_database { fi } -function create_elasticsearch_index { - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then - curl -XPUT "${CLOUDKITTY_ELASTICSEARCH_HOST}/${CLOUDKITTY_ELASTICSEARCH_INDEX}" - fi -} - # init_cloudkitty() - Initialize CloudKitty database function init_cloudkitty { # Delete existing cache @@ -264,7 +258,6 @@ function init_cloudkitty { recreate_database cloudkitty utf8 create_influxdb_database - create_elasticsearch_index # Migrate cloudkitty database $CLOUDKITTY_BIN_DIR/cloudkitty-dbsync upgrade @@ -301,15 +294,42 @@ function install_influx { sudo systemctl start influxdb || sudo systemctl restart influxdb } +# Remove Elasticsearch package if present +function _cleanup_elasticsearch_ubuntu { + if sudo dpkg --list elasticsearch; then + sudo dpkg --purge elasticsearch + fi + _cleanup_elasticsearch_data +} + +function _cleanup_elasticsearch_fedora { + if sudo rpm -q elasticsearch; then + sudo yum remove elasticsearch -y + fi + _cleanup_elasticsearch_data +} + +# Remove Elasticsearch data if present +function _cleanup_elasticsearch_data { + if [[ -d /var/lib/elasticsearch ]]; then + sudo rm -rf /var/lib/elasticsearch + fi + if [[ -d /etc/elasticsearch ]]; then + sudo rm -rf /etc/elasticsearch + fi +} + function install_elasticsearch_ubuntu { + _cleanup_elasticsearch_ubuntu sudo apt install -qy openjdk-8-jre - local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.3.deb) - sudo dpkg -i --skip-same-version ${elasticsearch_file} + local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-amd64.deb) + sudo dpkg -i ${elasticsearch_file} } function install_elasticsearch_fedora { + _cleanup_elasticsearch_fedora sudo yum install -y java-1.8.0-openjdk - local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.3.rpm) + local elasticsearch_file=$(get_extra_file https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-x86_64.rpm) sudo yum localinstall -y ${elasticsearch_file} } @@ -322,6 +342,10 @@ function install_elasticsearch { die $LINENO "Distribution must be Debian or Fedora-based" fi sudo systemctl start elasticsearch || sudo systemctl restart elasticsearch + echo "Waiting a minute for Elasticsearch to start..." + if ! wait_for_service 60 http://localhost:9200/; then + die $LINENO "Elasticsearch did not start" + fi } # install_cloudkitty() - Collect source and prepare diff --git a/doc/source/_static/cloudkitty.conf.sample b/doc/source/_static/cloudkitty.conf.sample index 9c00a17b..08f8b556 100644 --- a/doc/source/_static/cloudkitty.conf.sample +++ b/doc/source/_static/cloudkitty.conf.sample @@ -1353,6 +1353,14 @@ # Elasticsearch index to use. Defaults to "cloudkitty". (string value) #index_name = cloudkitty +# Elasticsearch template name to use. Defaults to +# "cloudkitty_mapping". (string value) +#template_name = cloudkitty_mapping + +# List of Elasticsearch component template names to include in the +# index template. (list value) +#component_templates = + # Set to true to allow insecure HTTPS connections to Elasticsearch # (boolean value) #insecure = false diff --git a/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml b/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml new file mode 100644 index 00000000..072a10ac --- /dev/null +++ b/releasenotes/notes/add-elasticsearch-index-alias-support-2bcdacff9ee91524.yaml @@ -0,0 +1,17 @@ +--- +prelude: > + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. +features: + - | + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. + Provides an index template to manage cloudkitty document mappings with the + option to include user provided component templates. + Uses dated indices and an index alias to allow for better management of data. +upgrade: + - | + Improves support for Elasticsearch storage backend. Requires Elasticsearch 7. + The config setting 'index_name' now refers to the index alias used to + reference the latest cloudkitty index. If no index or index alias exists on + first run then they will be created. If 'index_name' already exists and is + not an alias then initialisation will fail - a simple solution is to use a + new 'index_name' and reindex existing data into the first index created.