diff --git a/.gitignore b/.gitignore index da3896ab..fd072931 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,6 @@ eggs/ .eggs/ .coverage* .idea +.vscode +venv* .DS_Store diff --git a/crawler/crawler.conf b/crawler/crawler.conf index 90d36be6..0896f347 100644 --- a/crawler/crawler.conf +++ b/crawler/crawler.conf @@ -3,27 +3,27 @@ #enabled_emitter_plugins = Stdout Emitter, File Emitter [ crawlers ] - [[ os_container ]] - target = CONTAINER - - [[ process_container ]] + [[ os_container ]] + target = CONTAINER + + [[ process_container ]] get_mmap_files = False [[ cpu_container ]] # False for [0-100%]; True for [0-NCPU*100%] == docker stats type CPU usage metric_type_absolute = True - - [[ os_vm ]] - - [[ process_vm ]] - - [[ os_host ]] - - [[ process_host ]] - - [[ ruby_pkg ]] - - [[ python_pkg ]] + + [[ os_vm ]] + + [[ process_vm ]] + + [[ os_host ]] + + [[ process_host ]] + + [[ ruby_pkg ]] + + [[ python_pkg ]] avoid_setns = False [[ fprobe_container ]] @@ -72,12 +72,17 @@ [[ Stdout Emitter ]] arg_from_conf = 1 format = csv - + [[ File Emitter ]] url = file://tmp/crawler-out format = csv arg_from_conf = 2 + [[ Elastic Emitter ]] + url = elastic://localhost:9200 + format = json + arg_from_conf = 2 + [[ SAS Https Emitter ]] token_filepath = /etc/sas-secrets/token access_group_filepath = /etc/sas-secrets/access_group diff --git a/crawler/plugins/emitters/es_emitter.plugin b/crawler/plugins/emitters/es_emitter.plugin new file mode 100644 index 00000000..38fe3582 --- /dev/null +++ b/crawler/plugins/emitters/es_emitter.plugin @@ -0,0 +1,8 @@ +[Core] +Name = Elastic Emitter +Module = es_emitter + +[Documentation] +Author = Mahesh Babu Gorantla +Version = 0.1 +Description = Plugin to emit frames to Elastic Search Index diff --git a/crawler/plugins/emitters/es_emitter.py b/crawler/plugins/emitters/es_emitter.py new file mode 100644 index 00000000..0c403511 --- /dev/null +++ b/crawler/plugins/emitters/es_emitter.py @@ -0,0 +1,215 @@ +from __future__ import print_function +import logging +from json import loads +from datetime import datetime + +from sys import executable +import subprocess +try: + from elasticsearch import ElasticSearch +except ImportError: + subprocess.check_call([executable, "-m", "pip", "install", "elasticsearch"]) +finally: + from elasticsearch import Elasticsearch + from elasticsearch.helpers import bulk + +import traceback + +from iemit_plugin import IEmitter + +logger = logging.getLogger('crawlutils') + + +class ElasticEmitter(IEmitter): + """ + Emitter to index the crawler frames into an elastic search index + """ + + def init(self, url, timeout=1, max_retries=5, emit_format='csv'): + IEmitter.init( + self, + url, + timeout=timeout, + max_retries=max_retries, + emit_format=emit_format + ) + if emit_format == 'json': + self.emit_per_line = True + + self.url = url + self.elastic_index_name = self._get_elastic_index_name() + self.elastic_engine = self._get_elasticsearch() + + def get_emitter_protocol(self): + return "elastic" + + def _get_elasticsearch(self): + """Returns an ElasticSearch Client + + Returns: + Elasticsearch -- An ElasticSearch Client + """ + url = "http{}".format(self.url[len(self.get_emitter_protocol()):]) + return Elasticsearch([url]) + + def _get_elastic_index_name(self, prefix_identifier=None): + """Returns the name of the elasticsearch index + + Keyword Arguments: + prefix_identifier {str} -- Identifier prefix name for the elasticsearch index (default: {None}) + + Returns: + str -- Name of the Elasticsearch Index + """ + + if not prefix_identifier: + prefix_identifier = 'deploy-bom' + return "{}-{}".format(prefix_identifier, datetime.utcnow().strftime("%Y.%m.%d")) + + def emit(self, frame, compress=False, metadata={}, snapshot_num=0, **kwargs): + """ + A wrapper function used by crawler to index the frame into an elasticsearch index + """ + + bulk_queue_size = 128 + + try: + frame_ = self.format(frame) + + if self.emit_per_line: + frame_.seek(0) + + # Ignoring the redundant system metadata fields + ignore_metadata_keys = ['uuid', 'features', 'namespace'] + + for key in ignore_metadata_keys: + metadata.pop(key) + + user_metadata_fields = [str(key) for key in metadata.keys()] + + self._bulk_insert_frame( + frame=frame_, + metadata_keys=user_metadata_fields, + max_queue_size=bulk_queue_size + ) + + except Exception as error: + traceback.print_exc() + print(error) + + def __add_metadata(self, metadata=None, user_metadata_keys=None, json_document=None): + """Adds user specified metadata_keys to each json_document + + Keyword Arguments: + metadata {dict} -- Metadata from crawler and user (default: {None}) + user_metadata_keys {list} -- List of custom user metadata fields (default: {None}) + json_document {dict} -- JSON Formatted Document (default: {None}) + """ + if not isinstance(metadata, dict): + raise TypeError("'metadata' should be of {}".format(dict)) + + if not isinstance(user_metadata_keys, list): + raise TypeError("'metadata_keys' should be of {}".format(list)) + + if not isinstance(json_document, dict): + raise TypeError("'elastic_doc' should be of {}".format(dict)) + + for key in user_metadata_keys: + json_document[key] = metadata.get(key, None) + + return json_document + + def __gen_elastic_document(self, source_field_body=None): + """Formats source_field_body into an elastic document + + Keyword Arguments: + source_field_body {dict} -- Crawler Frame (default: {None}) + """ + + if not isinstance(source_field_body, dict): + raise TypeError("'source_field_body' should be {}".format(dict)) + + _elastic_document = { + "_index": self.elastic_index_name, + "_type": "doc", + "_source": source_field_body + } + + return _elastic_document + + def _gen_elastic_documents(self, frame=None, metadata_keys=None): + """Helper function to add metadata_keys to each doc in the frame and format them into an elastic document + + Keyword Arguments: + frame {StringO} -- Crawler Frame (default: {None}) + metadata_keys {list} -- List of custom user metadata fields (default: {None}) + """ + + if not isinstance(metadata_keys, list): + raise TypeError("'metadata_keys' should be of {}".format(list)) + + try: + # This metadata contains both crawler and user specified metadata fields + system_metadata = loads(frame.readline()) + + for doc in frame: + formatted_json_document = loads(doc.strip()) + _formatted_doc = self.__add_metadata( + metadata=system_metadata, + user_metadata_keys=metadata_keys, + json_document=formatted_json_document + ) + elastic_document = self.__gen_elastic_document(_formatted_doc) + yield elastic_document + + except ValueError as value_error: + print("Invalid JSON Formatting in frame") + print(value_error) + + def _bulk_insert_frame(self, frame=None, metadata_keys=None, max_queue_size=64): + """Bulk insert the crawler frame into the elasticsearch index + + Keyword Arguments: + frame {cStringIO} -- Crawler Frame (default: {None}) + metadata_keys {list} -- List of custom user metadata fields (default: {None}) + max_queue_size {int} -- Maximum number of documents to queue + before performing a bulk insert (default: {64}) + """ + + if not isinstance(metadata_keys, list): + raise TypeError("'metadata_keys' should be of {}".format(list)) + + if not isinstance(max_queue_size, int): + raise TypeError("'max_queue_size' should be of {}".format(int)) + + bulk_queue = [] + queue_size = 0 + + elastic_documents = self._gen_elastic_documents( + frame=frame, + metadata_keys=metadata_keys + ) + + for document in elastic_documents: + # print(document) + bulk_queue.append(document) + queue_size = (queue_size + 1) % max_queue_size + + if queue_size == 0: + bulk( + self.elastic_engine, + bulk_queue, + request_timeout=30, + max_retries=5 + ) + del bulk_queue[:] # Empty the queue + + # NOTE: The number of documents in the frame might not always be divisible by max_queue_size + # Indexing any left over documents in the bulk_queue + if bulk_queue: + bulk( + self.elastic_engine, + bulk_queue, + request_timeout=30, + max_retries=5 + ) diff --git a/tests/unit/test_es_emitter.py b/tests/unit/test_es_emitter.py new file mode 100644 index 00000000..5c91c434 --- /dev/null +++ b/tests/unit/test_es_emitter.py @@ -0,0 +1,107 @@ +import sys +sys.path.append('crawler') + +from unittest import TestCase, main +from plugins.emitters.es_emitter import ElasticEmitter +from base_crawler import BaseFrame + + +class TestElasticEmitter(TestCase): + + def setUp(self): + + self.frame = BaseFrame(feature_types=["os", "disk", + "process", "package"]) + self.frame.metadata['namespace'] = '192.168.1.221' + self.system_type = "host" + self.frame.metadata['system_type'] = self.system_type + + self.extra_metadata = {"field1": 123, "field2": "abc"} + self.frame.metadata.update(self.extra_metadata) + + self.frame.add_features([ + ( + "os", + {"boottime": 1594481866.0, + "uptime": 8066.0, + "ipaddr": ["127.0.0.1", "192.168.1.221"], + "os": "ubuntu", + "os_version": "18.04", + "os_kernel": "Linux-4.15.0-109-generic-x86_64-with-Ubuntu-18.04-bionic", + "architecture": "x86_64"}, + "os" + ), + ( + "disk", + {"partitionname": "proc", + "freepct": 100.0, + "fstype": "proc", + "mountpt": "/proc", + "mountopts": "rw,nosuid,nodev,noexec,relatime", + "partitionsize": 0 + }, + "disk" + ), + ( + "process", + {"cmd": "/usr/lib/postgresql/10/bin/postgres -D /var/lib/postgresql/10/main -c config_file=/etc/postgresql/10/main/postgresql.conf", + "created": 1594481947.07, + "cwd": "/var/lib/postgresql/10/main", + "pname": "postgres", + "openfiles": ["/var/log/postgresql/postgresql-10-main.log", + "/var/log/postgresql/postgresql-10-main.log", + "/var/log/postgresql/postgresql-10-main.log", + "/var/log/postgresql/postgresql-10-main.log"], + "mmapfiles": [], "pid": 1831, "ppid": 1, + "threads": 1, + "user": "postgres"}, + "process" + ), + ( + "package", + {"installed": None, + "pkgname": "postgresql-10", + "pkgsize": "14816", + "pkgversion": "10.12-0ubuntu0.18.04.1", + "pkgarchitecture": "amd64"}, + "package" + ) + ]) + + self.emitter = ElasticEmitter() + self.emitter.init( + url="elastic://localhost:9200", + emit_format='json' + ) + + def test_gen_elastic_documents(self): + frame = self.emitter.format(self.frame) + + all_metadata_keys = self.frame.metadata.keys() + + es_metadata_keys = [] + existing_keys_in_each_frame = {"uuid", "features", "namespace"} + + for key in all_metadata_keys: + if key not in existing_keys_in_each_frame: + es_metadata_keys.append(key) + + if self.emitter.emit_per_line: + frame.seek(0) + + elastic_docs = self.emitter._gen_elastic_documents( + frame, es_metadata_keys) + for es_doc, doc in zip(elastic_docs, self.frame.data): + doc[1]['system_type'] = self.system_type + doc[1]['timestamp'] = self.frame.metadata['timestamp'] + for key, value in self.extra_metadata.items(): + doc[1][key] = value + self.assertEqual(es_doc['_source'], doc[1]) + + def tearDown(self): + del self.frame + del self.emitter + + +if __name__ == '__main__': + main()