From a8ecb36c8d3a3fc4333092a5d1c43acdb1e2feda Mon Sep 17 00:00:00 2001 From: Walter Schultz Date: Wed, 5 Jun 2024 14:08:49 -0400 Subject: [PATCH] Added a check for sft in the kds, if it isn't on the classpath --- .../pom.xml | 31 ++++++-- .../KafkaDatastoreBinderConfiguration.java | 4 +- ...atastoreBinderConfigurationProperties.java | 4 +- .../KafkaDatastoreBinderProvisioner.java | 2 +- .../binder}/KafkaDatastoreMessageBinder.java | 9 ++- .../KafkaDatastoreMessageProducer.java | 2 +- .../converters/SimpleFeatureConverter.java | 2 +- .../main/resources/META-INF/spring.binders | 2 +- .../KafkaDatastoreMessageBinderTest.java | 72 +++++++++++++------ 9 files changed, 94 insertions(+), 34 deletions(-) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderConfiguration.java (94%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderConfigurationProperties.java (89%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderProvisioner.java (96%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageBinder.java (92%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageProducer.java (97%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/converters/SimpleFeatureConverter.java (96%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageBinderTest.java (74%) diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml index b27a34e0f48b..cf487d828cb5 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml @@ -13,8 +13,6 @@ GeoMesa Kafka Spring Cloud Stream Binder - 11 - 11 UTF-8 2021.0.9 2.7.18 @@ -32,7 +30,14 @@ org.springframework.boot spring-boot-starter-log4j2 + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.curator @@ -62,11 +67,29 @@ org.apache.logging.log4j log4j-to-slf4j + + ch.qos.logback + logback-classic + + + org.junit.jupiter + junit-jupiter + - org.junit.vintage - junit-vintage-engine + junit + junit + test + + + + + + + + org.slf4j + slf4j-reload4j test diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java similarity index 94% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java index dfd5cf4db7bb..e7bb93592c1c 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java @@ -6,11 +6,11 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.DataStoreFinder; -import org.locationtech.geomesa.spring.binder.kafka.datastore.converters.SimpleFeatureConverter; +import org.locationtech.geomesa.kafka.spring.binder.converters.SimpleFeatureConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java similarity index 89% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java index e466ef9105ea..558970012e34 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -15,7 +15,7 @@ import java.util.Map; @ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore") -public class KafkaDatastoreBinderConfigurationProperties { + public class KafkaDatastoreBinderConfigurationProperties { public Map binder = new HashMap<>(); public Map getBinder() { diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java similarity index 96% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java index 51abaae3a2d1..5b97b2f3e44b 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.springframework.cloud.stream.binder.ConsumerProperties; import org.springframework.cloud.stream.binder.ProducerProperties; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java similarity index 92% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java index bd1cde486384..acd190c69d21 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.FeatureWriter; @@ -69,7 +69,12 @@ protected MessageHandler createProducerMessageHandler( if (sft.isDefined()) { ds.createSchema(sft.get()); } else { - logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName); + try { + ds.getSchema(sfName); + logger.debug("There is no local schema for {}, but we found it in the kds", sfName); + } catch (IOException e) { + logger.error("There is no sft schema {} in the kds {} or locally", sfName, ds.getInfo().getDescription(), e); + } } if (message.getPayload() instanceof SimpleFeature) { diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java similarity index 97% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java index caac64aef182..98e4b4f85944 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java similarity index 96% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java index b5ccbf3c7869..496aef697e56 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore.converters; +package org.locationtech.geomesa.kafka.spring.binder.converters; import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; import org.geotools.api.feature.simple.SimpleFeature; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders index 141a8515bc72..efa66a3b0df4 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders @@ -1,2 +1,2 @@ kafka-datastore:\ -com.ccri.geomesa.binder.kafka.datastore.KafkaDatastoreBinderConfiguration \ No newline at end of file +org.locationtech.geomesa.kafka.spring.binder.KafkaDatastoreBinderConfiguration \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java similarity index 74% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java index 9bcc0bfe48dd..077a5ef1f382 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.FeatureWriter; @@ -15,12 +15,10 @@ import org.geotools.api.feature.simple.SimpleFeatureType; import org.geotools.api.filter.Filter; import org.geotools.feature.simple.SimpleFeatureBuilder; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.Before; +import org.junit.Test; import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.messaging.Message; @@ -31,29 +29,33 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -@ExtendWith(MockitoExtension.class) -class KafkaDatastoreMessageBinderTest { +public class KafkaDatastoreMessageBinderTest { - @Mock DataStore ds; - @Mock FeatureWriter featureWriter; - @Mock SimpleFeatureStore simpleFeatureStore; - @Mock SubscribableChannel errorChannel; - - Supplier dsFactory = () -> ds; + Supplier dsFactory; + + @Before + public void init() { + ds = mock(DataStore.class); + featureWriter = mock(FeatureWriter.class); + simpleFeatureStore = mock(SimpleFeatureStore.class); + errorChannel = mock(SubscribableChannel.class); + dsFactory = () -> ds; + } @Test public void producerMessageHandler_canWriteSft() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); @@ -70,7 +72,7 @@ public void producerMessageHandler_canWriteSft() throws IOException { SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); - simpleFeature.setAttribute("mmsi", 123456); + simpleFeature.setAttribute("id", "123456"); Message message = MessageBuilder.withPayload(simpleFeature) .setHeader("featureType", "application/simple-feature") .build(); @@ -78,11 +80,12 @@ public void producerMessageHandler_canWriteSft() throws IOException { handler.handleMessage(message); verify(featureWriter).write(); - assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); } @Test public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException { + var now = Instant.now(); SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); @@ -99,8 +102,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); - simpleFeature.setAttribute("mmsi", 123456); - simpleFeature.setAttribute("elevation", 100); + simpleFeature.setAttribute("id", "123456"); + simpleFeature.setAttribute("dtg", now); KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); Message message = MessageBuilder.withPayload(changed) .setHeader("featureType", "application/kafka-feature-event") @@ -109,8 +112,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE handler.handleMessage(message); verify(featureWriter).write(); - assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); - assertThat(writeableFeature.getAttribute("elevation")).isEqualTo(100f); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); + assertThat(writeableFeature.getAttribute("dtg")).isEqualTo(Date.from(now)); } @Test @@ -165,4 +168,33 @@ public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOExc verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE); } + @Test + public void producerMessageHandler_loadSftFromClasspath() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("observation", producerProperties); + + // Doesn't throw an error + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("id", "123456"); + KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(changed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(ds).createSchema(any()); + } + } \ No newline at end of file