From 70bb161611369915bd6a4266c5914aa6822dfe29 Mon Sep 17 00:00:00 2001 From: Walter Schultz Date: Wed, 5 Jun 2024 10:37:31 -0400 Subject: [PATCH] Moved module under geomesa-kafka --- docs/user/kafka/spring_stream_binder.rst | 191 ++++++++++++++++++ docs/user/spring/stream_binder.rst | 182 ----------------- .../pom.xml | 31 ++- .../KafkaDatastoreBinderConfiguration.java | 0 ...atastoreBinderConfigurationProperties.java | 0 .../KafkaDatastoreBinderProvisioner.java | 0 .../KafkaDatastoreMessageBinder.java | 0 .../KafkaDatastoreMessageProducer.java | 0 .../converters/SimpleFeatureConverter.java | 0 .../main/resources/META-INF/spring.binders | 0 .../KafkaDatastoreMessageBinderTest.java | 0 .../src/test/resources/observation.conf | 0 .../src/test/resources/reference.conf | 0 geomesa-kafka/pom.xml | 1 + geomesa-spring/pom.xml | 45 ----- pom.xml | 1 - 16 files changed, 221 insertions(+), 230 deletions(-) create mode 100644 docs/user/kafka/spring_stream_binder.rst delete mode 100644 docs/user/spring/stream_binder.rst rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/pom.xml (65%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/resources/META-INF/spring.binders (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/resources/observation.conf (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/resources/reference.conf (100%) delete mode 100644 geomesa-spring/pom.xml diff --git a/docs/user/kafka/spring_stream_binder.rst b/docs/user/kafka/spring_stream_binder.rst new file mode 100644 index 000000000000..b92cd8ba6cc9 --- /dev/null +++ b/docs/user/kafka/spring_stream_binder.rst @@ -0,0 +1,191 @@ +Spring Cloud Stream Geomesa Kafka Datastore Binder +================================================== + +The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into +Geomesa Kafka Datastore to process events. + +If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: +https://spring.io/projects/spring-cloud-stream + +Input/Output Types +------------ + +This binder will provide all ``KafkaFeatureEvent`` s from kafka datastore to your configured function definitions. Each +function will have to do it's own type comparison to see if the event is a ``KafkaFeatureEvent.KafkaFeatureChanged``, +``KafkaFeatureEvent.KafkaFeatureRemoved``, or another event type. + +The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume +or produces ``SimpleFeature`` s and avoid working with ``KafkaFeatureEvent`` s directly. + +.. note:: + + The SimpleFeature converter extracts the SimpleFeature out of ``KafkaFeatureEvent.KafkaFeatureChanged`` events and + ignores all others. Any function definition that consumes SimpleFeatures will miss the + ``KafkaFeatureEvent.KafkaFeatureRemoved`` and ``KafkaFeatureEvent.KafkaFeatureCleared`` messages. And any function + definition that only writes SimpleFeatures will not be able to send those messages. + +Configuration +------------- + +The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any +configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). +For example, to specify kafka.catalog.topic for the binder, set: + +.. code-block:: yaml + + spring: + cloud: + stream: + kafka-datastore: + binder: + kafka-catalog-topic: geomesa-catalog-topic + +For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html + +Examples +-------- + +Simple Logger App +----------------- + +.. code-block:: java + + @Bean + public Consumer log() { + return obj -> logger.info(obj.toString()); + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: log + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + log-in-0: input + bindings: + input: + destination: messages + group: logger + +Simple Enricher App +------------------- + +.. code-block:: java + + @Bean + public Function attachSourceField() { + return sf -> { + sf.setAttribute("source", "un-labelled source"); + return sf; + }; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: attachSourceField + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + attachSourceField-in-0: input + attachSourceField-out-0: output + bindings: + input: + destination: un-labelled-source-ob + group: sft-reader + output: + destination: observations + group: sft-writer + +Simple Filter App +------------------- + +.. code-block:: java + + @Bean + public Function excludeMoving() { + return sf -> { + if (sf.getAttribute("status").equals("IN_TRANSIT")) { + return null; + } + return sf; + }; + } + + +.. code-block:: yaml + + spring: + cloud: + function: + definition: filterMoving + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + filterMoving-in-0: input + filterMoving-out-0: output + bindings: + input: + destination: movingAndUnmovingThings + group: sft-reader + output: + destination: unMovingThings + group: sft-writer + +Multiple Datastore App +---------------------- + +In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment +field. + +.. code-block:: java + + @Bean + public Function passThrough() { + return event -> event; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: passThrough + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + passThrough-in-0: input + passThrough-out-0: output + binders: + kds-start: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/start + kds-end: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/end + bindings: + input: + destination: observations + group: sft-reader + binder: kds-start + output: + destination: observations + group: sft-writer + binder: kds-end diff --git a/docs/user/spring/stream_binder.rst b/docs/user/spring/stream_binder.rst deleted file mode 100644 index be5cf896d9da..000000000000 --- a/docs/user/spring/stream_binder.rst +++ /dev/null @@ -1,182 +0,0 @@ -Spring Cloud Stream Geomesa Kafka Datastore Binder -================================================== - -The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into -Geomesa Kafka Datastore to process events. - -If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: -https://spring.io/projects/spring-cloud-stream - -Input/Output Types ------------- - -This binder will provide all `KafkaFeatureEvent`s from kafka datastore to your configured function definitions. Each -function will have to do it's own type comparision to see if the event is a `KafkaFeatureEvent.KafkaFeatureChanged`, -`KafkaFeatureEvent.KafkaFeatureRemoved`, or another event type. - -The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume -or produces `SimpleFeature`s and avoid working with `KafkaFeatureEvent`s directly. - -Note: The SimpleFeature converter extracts the SimpleFeature out of `KafkaFeatureEvent.KafkaFeatureChanged` events and -ignores all others. Any function definition that consumes SimpleFeatures will miss the -`KafkaFeatureEvent.KafkaFeatureRemoved` and ``KafkaFeatureEvent.KafkaFeatureCleared` messages. And any function -definition that only writes SimpleFeatures will not be able to send those messages. - -Configuration -------------- - -The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any -configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). -For example, to specify kafka.catalog.topic for the binder, set: -```yaml -spring: - cloud: - stream: - kafka-datastore: - binder: - kafka-catalog-topic: geomesa-catalog-topic -``` - -For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html - -Examples --------- - -Simple Logger App ------------------ -``` -@Bean -public Consumer log() { - return obj -> logger.info(obj.toString()); -} -``` -``` -spring: - cloud: - function: - definition: log - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - log-in-0: input - bindings: - input: - destination: messages - group: logger -``` - -Simple Enricher App -------------------- - -``` -@Bean -public Function attachSourceField() { - return sf -> { - sf.setAttribute("source", "un-labelled source"); - return sf; - }; -} -``` -``` -spring: - cloud: - function: - definition: attachSourceField - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - attachSourceField-in-0: input - attachSourceField-out-0: output - bindings: - input: - destination: un-labelled-source-ob - group: sft-reader - output: - destination: observations - group: sft-writer -``` - -Simple Filter App -------------------- -``` -@Bean -public Function excludeMoving() { - return sf -> { - if (sf.getAttribute("status").equals("IN_TRANSIT")) { - return null; - } - return sf; - }; -} -``` -``` -spring: - cloud: - function: - definition: filterMoving - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - filterMoving-in-0: input - filterMoving-out-0: output - bindings: - input: - destination: movingAndUnmovingThings - group: sft-reader - output: - destination: unMovingThings - group: sft-writer -``` - -Multiple Datastore App ----------------------- - -In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment -field. - -``` -@Bean -public Function passThrough() { - return event -> event; -} -``` -``` -spring: - cloud: - function: - definition: passThrough - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - passThrough-in-0: input - passThrough-out-0: output - binders: - kds-start: - type: kafka-datastore - environment: - spring.cloud.stream.kafka-datastore.binder: - kafka-zk-path: geomesa/start - kds-end: - type: kafka-datastore - environment: - spring.cloud.stream.kafka-datastore.binder: - kafka-zk-path: geomesa/end - bindings: - input: - destination: observations - group: sft-reader - binder: kds-start - output: - destination: observations - group: sft-writer - binder: kds-end - -``` \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml similarity index 65% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml index 7a1ed6bf131e..b27a34e0f48b 100644 --- a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml @@ -5,16 +5,19 @@ 4.0.0 org.locationtech.geomesa - geomesa-spring + geomesa-kafka_2.12 5.1.0-SNAPSHOT - geomesa-spring-cloud-stream-binder-kafka-datastore + geomesa-kafka-spring-cloud-stream-binder + GeoMesa Kafka Spring Cloud Stream Binder 11 11 UTF-8 + 2021.0.9 + 2.7.18 @@ -61,6 +64,30 @@ + + org.junit.vintage + junit-vintage-engine + test + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf diff --git a/geomesa-kafka/pom.xml b/geomesa-kafka/pom.xml index 994adb5ca33c..3b8898fde893 100644 --- a/geomesa-kafka/pom.xml +++ b/geomesa-kafka/pom.xml @@ -18,6 +18,7 @@ geomesa-kafka-gs-plugin geomesa-kafka-tools geomesa-kafka-utils + geomesa-kafka-spring-cloud-stream-binder diff --git a/geomesa-spring/pom.xml b/geomesa-spring/pom.xml deleted file mode 100644 index 3028d73d7d7e..000000000000 --- a/geomesa-spring/pom.xml +++ /dev/null @@ -1,45 +0,0 @@ - - - 4.0.0 - - org.locationtech.geomesa - geomesa_2.12 - 5.1.0-SNAPSHOT - - - geomesa-spring - pom - - geomesa-spring-cloud-stream-binder-kafka-datastore - - - - 11 - 11 - UTF-8 - 2021.0.9 - 2.7.18 - - - - - - org.springframework.boot - spring-boot-dependencies - ${spring-boot.version} - pom - import - - - org.springframework.cloud - spring-cloud-dependencies - ${spring-cloud.version} - pom - import - - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4e92f90f7ddc..57b921fb0295 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,6 @@ geomesa-utils-parent geomesa-z3 docs - geomesa-spring