diff --git a/pyproject.toml b/pyproject.toml index be869be..f835ef3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ thredds = "stac_generator.plugins.inputs.thredds:ThreddsInput" text_file = "stac_generator.plugins.inputs.text_file:TextFileInput" solr = "stac_generator.plugins.inputs.solr:SolrInput" elasticsearch = "stac_generator.plugins.inputs.elasticsearch:ElasticsearchInput" +kafka = "stac_generator.plugins.inputs.kafka:KafkaInput" [tool.poetry.plugins."stac_generator.outputs"] standard_out = "stac_generator.plugins.outputs.standard_out:StandardOutOutput" @@ -110,6 +111,7 @@ rabbitmq = "stac_generator.plugins.outputs.rabbit_mq:RabbitMQOutput" rabbitmq_bulk = "stac_generator.plugins.bulk_outputs.rabbit_mq:RabbitMQBulkOutput" intake_esm = "stac_generator.plugins.outputs.intake_esm:IntakeESMOutput" stac_fastapi = "stac_generator.plugins.outputs.stac_fastapi:STACFastAPIOutput" +kafka = "stac_generator.plugins.outputs.kafka:KafkaOutput" [tool.poetry.plugins."stac_generator.mappings"] ceda = "stac_generator.plugins.mappings.ceda:CEDAMapping" diff --git a/stac_generator/core/generator.py b/stac_generator/core/generator.py index 12eeb63..96b54d4 100644 --- a/stac_generator/core/generator.py +++ b/stac_generator/core/generator.py @@ -192,7 +192,7 @@ def _process(self, body: dict, **kwargs) -> None: :param kwargs: """ - def process(self, uri: str, **kwargs) -> None: + def process(self, body: dict, **kwargs) -> None: """ Run generator. @@ -201,6 +201,4 @@ def process(self, uri: str, **kwargs) -> None: """ kwargs["TYPE"] = self.TYPE - body = {"uri": uri} - self._process(body, **kwargs) diff --git a/stac_generator/plugins/inputs/elasticsearch.py b/stac_generator/plugins/inputs/elasticsearch.py index e6215e7..8292d7f 100644 --- a/stac_generator/plugins/inputs/elasticsearch.py +++ b/stac_generator/plugins/inputs/elasticsearch.py @@ -116,7 +116,7 @@ def run(self, generator: BaseGenerator): for bucket in aggregation["buckets"]: uri = bucket["key"]["uri"] if self.should_process(uri): - generator.process(**bucket["key"]) + generator.process(bucket["key"]) total_generated += 1 if "after_key" not in aggregation.keys(): diff --git a/stac_generator/plugins/inputs/file_system.py b/stac_generator/plugins/inputs/file_system.py index 9e6f159..d9044d9 100644 --- a/stac_generator/plugins/inputs/file_system.py +++ b/stac_generator/plugins/inputs/file_system.py @@ -69,7 +69,7 @@ def run(self, generator: BaseGenerator): filename = os.path.abspath(os.path.join(root, file)) if self.should_process(filename): - generator.process(filename) + generator.process({"uri": filename}) logger.debug(f"Input processing: {filename}") else: logger.debug(f"Input skipping: {filename}") diff --git a/stac_generator/plugins/inputs/intake_esm.py b/stac_generator/plugins/inputs/intake_esm.py index 60a4243..118e071 100644 --- a/stac_generator/plugins/inputs/intake_esm.py +++ b/stac_generator/plugins/inputs/intake_esm.py @@ -111,7 +111,7 @@ def run(self, generator: BaseGenerator): uri = getattr(row, self.object_attr) if self.should_process(uri): - generator.process(uri) + generator.process({"uri": uri}) LOGGER.debug(f"Input processing: {uri}") else: LOGGER.debug(f"Input skipping: {uri}") diff --git a/stac_generator/plugins/inputs/kafka.py b/stac_generator/plugins/inputs/kafka.py new file mode 100644 index 0000000..c8d9fd4 --- /dev/null +++ b/stac_generator/plugins/inputs/kafka.py @@ -0,0 +1,83 @@ +# encoding: utf-8 +""" +Kafka +----- + +An input plufin which polls a kafka event stream. + +**Plugin name:** ``kafka`` + +.. list-table:: + :header-rows: 1 + + * - Option + - Value Type + - Description + * - ``config`` + - ``dict`` + - ``REQUIRED`` Configuration for the `Kafka consumer `_ + * - ``topics`` + - ``list`` + - ``REQUIRED`` The topics to poll for messages. + * - ``timeout`` + - ``str`` + - ``REQUIRED`` The time between polling the event stream. + +Example configuration: + .. code-block:: yaml + + outputs: + - method: kafka + config: + 'bootstrap.servers': 'host1:9092,host2:9092' + topics: + - stac +""" + +import logging +import json + +from confluent_kafka import Consumer, KafkaError, KafkaException + +from stac_generator.core.generator import BaseGenerator +from stac_generator.core.input import BaseInput + +LOGGER = logging.getLogger(__name__) + + +class KafkaInput(BaseInput): + """ + Use Kafka event stream as input to collect messages to pass to + the processor. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.timeout = kwargs["timeout"] + self.topics = kwargs["topics"] + self.consumer = Consumer(**kwargs["config"]) + + def run(self, generator: BaseGenerator): + try: + self.consumer.subscribe(self.topics) + while True: + msg = self.consumer.poll(timeout=self.timeout) + if msg is None: + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + LOGGER.error( + "%% %s [%d] reached end at offset %d\n" + % (msg.topic(), msg.partition(), msg.offset()) + ) + elif msg.error(): + raise KafkaException(msg.error()) + + else: + data = json.loads(msg.value().decode('utf-8')) + generator.process(data) + finally: + # Close down consumer to commit final offsets. + self.consumer.close() diff --git a/stac_generator/plugins/inputs/object_store.py b/stac_generator/plugins/inputs/object_store.py index 3c474b6..4b014d3 100644 --- a/stac_generator/plugins/inputs/object_store.py +++ b/stac_generator/plugins/inputs/object_store.py @@ -96,8 +96,7 @@ def run(self, generator: BaseGenerator): ): generator.process( - f"{self.endpoint_url}/{bucket.name}/{obj.key}", - client=self.client, + {"uri": f"{self.endpoint_url}/{bucket.name}/{obj.key}"} ) total_files += 1 diff --git a/stac_generator/plugins/inputs/rabbit_mq.py b/stac_generator/plugins/inputs/rabbit_mq.py index 9b19ebd..2ccb6d3 100644 --- a/stac_generator/plugins/inputs/rabbit_mq.py +++ b/stac_generator/plugins/inputs/rabbit_mq.py @@ -310,14 +310,15 @@ def callback( self.acknowledge_message(ch, method.delivery_tag, connection) return - # Extract uri - uri = message.pop("uri") + # Extract body + body = message.pop("body") + uri = body["uri"] if self.should_process(uri): LOGGER.info("Input processing: %s message: %s", uri, message) self.acknowledge_message(ch, method.delivery_tag, connection) - generator.process(uri, **message) + generator.process(body["uri"], **message["kwargs"]) else: LOGGER.info("Input skipping: %s", uri) diff --git a/stac_generator/plugins/inputs/solr.py b/stac_generator/plugins/inputs/solr.py index a03a086..83341df 100644 --- a/stac_generator/plugins/inputs/solr.py +++ b/stac_generator/plugins/inputs/solr.py @@ -106,4 +106,4 @@ def run(self, generator: BaseGenerator): # by replacing '.' with '/' up until the filename uri = uri.replace(".", "/", uri.split("|")[0].count(".") - 1) - generator.process(uri) + generator.process({"uri": uri}) diff --git a/stac_generator/plugins/inputs/text_file.py b/stac_generator/plugins/inputs/text_file.py index 1710b87..f68d486 100644 --- a/stac_generator/plugins/inputs/text_file.py +++ b/stac_generator/plugins/inputs/text_file.py @@ -72,7 +72,7 @@ def run(self, generator: BaseGenerator): unique_lines.add(line) data = json.loads(line) try: - generator.process(**data) + generator.process(data) except Exception: errors.write(line) errors.write(traceback.format_exc()) diff --git a/stac_generator/plugins/inputs/thredds.py b/stac_generator/plugins/inputs/thredds.py index a58aef8..a0c586e 100644 --- a/stac_generator/plugins/inputs/thredds.py +++ b/stac_generator/plugins/inputs/thredds.py @@ -138,7 +138,7 @@ def process_ds(self, ds, generator: BaseGenerator): filepath = get_sub_attr(ds, self.object_attr) if self.should_process(filepath): - generator.process(filepath) + generator.process({"uri": filepath}) logger.debug(f"Input processing: {filepath}") else: logger.debug(f"Input skipping: {filepath}") diff --git a/stac_generator/plugins/outputs/kafka.py b/stac_generator/plugins/outputs/kafka.py new file mode 100644 index 0000000..af4bff1 --- /dev/null +++ b/stac_generator/plugins/outputs/kafka.py @@ -0,0 +1,73 @@ +# encoding: utf-8 +""" +Kafka +----- + +An output backend which outputs the generated metadata to a kafka event stream. + +**Plugin name:** ``kafka`` + +.. list-table:: + :header-rows: 1 + + * - Option + - Value Type + - Description + * - ``config`` + - ``dict`` + - ``REQUIRED`` Configuration for the `Kafka producer `_ + * - ``topic`` + - ``str`` + - ``REQUIRED`` The topic to post the message to. + * - ``key_term`` + - ``str`` + - Term to be used as the kafka messages key. + +Example configuration: + .. code-block:: yaml + + outputs: + - method: kafka + config: + 'bootstrap.servers': 'host1:9092,host2:9092' + topic: stac + key_term: item_id +""" +__author__ = "Richard Smith" +__date__ = "01 Jun 2021" +__copyright__ = "Copyright 2018 United Kingdom Research and Innovation" +__license__ = "BSD - see LICENSE file in top-level package directory" +__contact__ = "richard.d.smith@stfc.ac.uk" + +import json + +from confluent_kafka import Producer + +from stac_generator.core.output import BaseOutput + + +class KafkaOutput(BaseOutput): + """ + Simple print backend which can be used + for testing and debugging. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # Create the credentials object + if not hasattr(self, "input_term"): + self.key_term = "uri" + self.producer = Producer(self.config) + + def export(self, data: dict, **kwargs) -> None: + """ + Post the message to the kafka server. + + :param data: Data from extraction processes + :param kwargs: Not used + """ + key = data.get(self.key_term, None) + message = json.dumps(data).encode("utf8") + self.producer.produce(self.topic, key=key, value=message) + self.producer.flush()