diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml
index b27a34e0f48b..cf487d828cb5 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml
@@ -13,8 +13,6 @@
GeoMesa Kafka Spring Cloud Stream Binder
- 11
- 11
UTF-8
2021.0.9
2.7.18
@@ -32,7 +30,14 @@
org.springframework.boot
spring-boot-starter-log4j2
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+
org.apache.curator
@@ -62,11 +67,29 @@
org.apache.logging.log4j
log4j-to-slf4j
+
+ ch.qos.logback
+ logback-classic
+
+
+ org.junit.jupiter
+ junit-jupiter
+
- org.junit.vintage
- junit-vintage-engine
+ junit
+ junit
+ test
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-reload4j
test
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java
similarity index 94%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java
index dfd5cf4db7bb..e7bb93592c1c 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java
@@ -6,11 +6,11 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.geotools.api.data.DataStore;
import org.geotools.api.data.DataStoreFinder;
-import org.locationtech.geomesa.spring.binder.kafka.datastore.converters.SimpleFeatureConverter;
+import org.locationtech.geomesa.kafka.spring.binder.converters.SimpleFeatureConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java
similarity index 89%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java
index e466ef9105ea..558970012e34 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -15,7 +15,7 @@
import java.util.Map;
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore")
-public class KafkaDatastoreBinderConfigurationProperties {
+ public class KafkaDatastoreBinderConfigurationProperties {
public Map binder = new HashMap<>();
public Map getBinder() {
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java
similarity index 96%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java
index 51abaae3a2d1..5b97b2f3e44b 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java
similarity index 92%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java
index bd1cde486384..acd190c69d21 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.geotools.api.data.DataStore;
import org.geotools.api.data.FeatureWriter;
@@ -69,7 +69,12 @@ protected MessageHandler createProducerMessageHandler(
if (sft.isDefined()) {
ds.createSchema(sft.get());
} else {
- logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName);
+ try {
+ ds.getSchema(sfName);
+ logger.debug("There is no local schema for {}, but we found it in the kds", sfName);
+ } catch (IOException e) {
+ logger.error("There is no sft schema {} in the kds {} or locally", sfName, ds.getInfo().getDescription(), e);
+ }
}
if (message.getPayload() instanceof SimpleFeature) {
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java
similarity index 97%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java
index caac64aef182..98e4b4f85944 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.geotools.api.data.DataStore;
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java
similarity index 96%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java
index b5ccbf3c7869..496aef697e56 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore.converters;
+package org.locationtech.geomesa.kafka.spring.binder.converters;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.geotools.api.feature.simple.SimpleFeature;
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders
index 141a8515bc72..efa66a3b0df4 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders
@@ -1,2 +1,2 @@
kafka-datastore:\
-com.ccri.geomesa.binder.kafka.datastore.KafkaDatastoreBinderConfiguration
\ No newline at end of file
+org.locationtech.geomesa.kafka.spring.binder.KafkaDatastoreBinderConfiguration
\ No newline at end of file
diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java
similarity index 74%
rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java
rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java
index 9bcc0bfe48dd..077a5ef1f382 100644
--- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java
+++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java
@@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/
-package org.locationtech.geomesa.spring.binder.kafka.datastore;
+package org.locationtech.geomesa.kafka.spring.binder;
import org.geotools.api.data.DataStore;
import org.geotools.api.data.FeatureWriter;
@@ -15,12 +15,10 @@
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.filter.Filter;
import org.geotools.feature.simple.SimpleFeatureBuilder;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.Before;
+import org.junit.Test;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.messaging.Message;
@@ -31,29 +29,33 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Date;
import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
-@ExtendWith(MockitoExtension.class)
-class KafkaDatastoreMessageBinderTest {
+public class KafkaDatastoreMessageBinderTest {
- @Mock
DataStore ds;
- @Mock
FeatureWriter featureWriter;
- @Mock
SimpleFeatureStore simpleFeatureStore;
- @Mock
SubscribableChannel errorChannel;
-
- Supplier dsFactory = () -> ds;
+ Supplier dsFactory;
+
+ @Before
+ public void init() {
+ ds = mock(DataStore.class);
+ featureWriter = mock(FeatureWriter.class);
+ simpleFeatureStore = mock(SimpleFeatureStore.class);
+ errorChannel = mock(SubscribableChannel.class);
+ dsFactory = () -> ds;
+ }
@Test
public void producerMessageHandler_canWriteSft() throws IOException {
+
SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
@@ -70,7 +72,7 @@ public void producerMessageHandler_canWriteSft() throws IOException {
SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
- simpleFeature.setAttribute("mmsi", 123456);
+ simpleFeature.setAttribute("id", "123456");
Message> message = MessageBuilder.withPayload(simpleFeature)
.setHeader("featureType", "application/simple-feature")
.build();
@@ -78,11 +80,12 @@ public void producerMessageHandler_canWriteSft() throws IOException {
handler.handleMessage(message);
verify(featureWriter).write();
- assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456);
+ assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456");
}
@Test
public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException {
+ var now = Instant.now();
SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
@@ -99,8 +102,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE
SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
- simpleFeature.setAttribute("mmsi", 123456);
- simpleFeature.setAttribute("elevation", 100);
+ simpleFeature.setAttribute("id", "123456");
+ simpleFeature.setAttribute("dtg", now);
KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond());
Message> message = MessageBuilder.withPayload(changed)
.setHeader("featureType", "application/kafka-feature-event")
@@ -109,8 +112,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE
handler.handleMessage(message);
verify(featureWriter).write();
- assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456);
- assertThat(writeableFeature.getAttribute("elevation")).isEqualTo(100f);
+ assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456");
+ assertThat(writeableFeature.getAttribute("dtg")).isEqualTo(Date.from(now));
}
@Test
@@ -165,4 +168,33 @@ public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOExc
verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE);
}
+ @Test
+ public void producerMessageHandler_loadSftFromClasspath() throws IOException {
+ SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
+ SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
+ doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
+ doReturn(writeableFeature).when(featureWriter).next();
+
+ KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner();
+ provisioningProvider = new KafkaDatastoreBinderProvisioner();
+ KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory);
+
+ ProducerProperties producerProperties = new ProducerProperties();
+ ProducerDestination destination = provisioningProvider.provisionProducerDestination("observation", producerProperties);
+
+ // Doesn't throw an error
+ MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel);
+
+ SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
+ simpleFeature.setAttribute("id", "123456");
+ KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond());
+ Message> message = MessageBuilder.withPayload(changed)
+ .setHeader("featureType", "application/kafka-feature-event")
+ .build();
+
+ handler.handleMessage(message);
+
+ verify(ds).createSchema(any());
+ }
+
}
\ No newline at end of file