Skip to content

Commit

Permalink
Added a check for sft in the kds, if it isn't on the classpath
Browse files Browse the repository at this point in the history
  • Loading branch information
Walter Schultz committed Jun 6, 2024
1 parent 70bb161 commit a8ecb36
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 34 deletions.
31 changes: 27 additions & 4 deletions geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
<name>GeoMesa Kafka Spring Cloud Stream Binder</name>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud.version>2021.0.9</spring-cloud.version>
<spring-boot.version>2.7.18</spring-boot.version>
Expand All @@ -32,7 +30,14 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- NOTE: needed to work with GeoMesa -->
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down Expand Up @@ -62,11 +67,29 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.junit.vintage</groupId>-->
<!-- <artifactId>junit-vintage-engine</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;

import org.geotools.api.data.DataStore;
import org.geotools.api.data.DataStoreFinder;
import org.locationtech.geomesa.spring.binder.kafka.datastore.converters.SimpleFeatureConverter;
import org.locationtech.geomesa.kafka.spring.binder.converters.SimpleFeatureConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;

import org.springframework.boot.context.properties.ConfigurationProperties;

Expand All @@ -15,7 +15,7 @@
import java.util.Map;

@ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore")
public class KafkaDatastoreBinderConfigurationProperties {
public class KafkaDatastoreBinderConfigurationProperties {
public Map<String, ? extends Serializable> binder = new HashMap<>();

public Map<String, ? extends Serializable> getBinder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;

import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;

import org.geotools.api.data.DataStore;
import org.geotools.api.data.FeatureWriter;
Expand Down Expand Up @@ -69,7 +69,12 @@ protected MessageHandler createProducerMessageHandler(
if (sft.isDefined()) {
ds.createSchema(sft.get());
} else {
logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName);
try {
ds.getSchema(sfName);
logger.debug("There is no local schema for {}, but we found it in the kds", sfName);
} catch (IOException e) {
logger.error("There is no sft schema {} in the kds {} or locally", sfName, ds.getInfo().getDescription(), e);
}
}

if (message.getPayload() instanceof SimpleFeature) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;


import org.geotools.api.data.DataStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore.converters;
package org.locationtech.geomesa.kafka.spring.binder.converters;

import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.geotools.api.feature.simple.SimpleFeature;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
kafka-datastore:\
com.ccri.geomesa.binder.kafka.datastore.KafkaDatastoreBinderConfiguration
org.locationtech.geomesa.kafka.spring.binder.KafkaDatastoreBinderConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.spring.binder.kafka.datastore;
package org.locationtech.geomesa.kafka.spring.binder;

import org.geotools.api.data.DataStore;
import org.geotools.api.data.FeatureWriter;
Expand All @@ -15,12 +15,10 @@
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.filter.Filter;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.Before;
import org.junit.Test;
import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.messaging.Message;
Expand All @@ -31,29 +29,33 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class KafkaDatastoreMessageBinderTest {
public class KafkaDatastoreMessageBinderTest {

@Mock
DataStore ds;
@Mock
FeatureWriter featureWriter;
@Mock
SimpleFeatureStore simpleFeatureStore;
@Mock
SubscribableChannel errorChannel;

Supplier<DataStore> dsFactory = () -> ds;
Supplier<DataStore> dsFactory;

@Before
public void init() {
ds = mock(DataStore.class);
featureWriter = mock(FeatureWriter.class);
simpleFeatureStore = mock(SimpleFeatureStore.class);
errorChannel = mock(SubscribableChannel.class);
dsFactory = () -> ds;
}

@Test
public void producerMessageHandler_canWriteSft() throws IOException {

SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
Expand All @@ -70,19 +72,20 @@ public void producerMessageHandler_canWriteSft() throws IOException {


SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
simpleFeature.setAttribute("mmsi", 123456);
simpleFeature.setAttribute("id", "123456");
Message<?> message = MessageBuilder.withPayload(simpleFeature)
.setHeader("featureType", "application/simple-feature")
.build();

handler.handleMessage(message);

verify(featureWriter).write();
assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456);
assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456");
}

@Test
public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException {
var now = Instant.now();
SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
Expand All @@ -99,8 +102,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE


SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
simpleFeature.setAttribute("mmsi", 123456);
simpleFeature.setAttribute("elevation", 100);
simpleFeature.setAttribute("id", "123456");
simpleFeature.setAttribute("dtg", now);
KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond());
Message<?> message = MessageBuilder.withPayload(changed)
.setHeader("featureType", "application/kafka-feature-event")
Expand All @@ -109,8 +112,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE
handler.handleMessage(message);

verify(featureWriter).write();
assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456);
assertThat(writeableFeature.getAttribute("elevation")).isEqualTo(100f);
assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456");
assertThat(writeableFeature.getAttribute("dtg")).isEqualTo(Date.from(now));
}

@Test
Expand Down Expand Up @@ -165,4 +168,33 @@ public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOExc
verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE);
}

@Test
public void producerMessageHandler_loadSftFromClasspath() throws IOException {
SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get();
SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0");
doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any());
doReturn(writeableFeature).when(featureWriter).next();

KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner();
provisioningProvider = new KafkaDatastoreBinderProvisioner();
KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory);

ProducerProperties producerProperties = new ProducerProperties();
ProducerDestination destination = provisioningProvider.provisionProducerDestination("observation", producerProperties);

// Doesn't throw an error
MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel);

SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1");
simpleFeature.setAttribute("id", "123456");
KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond());
Message<?> message = MessageBuilder.withPayload(changed)
.setHeader("featureType", "application/kafka-feature-event")
.build();

handler.handleMessage(message);

verify(ds).createSchema(any());
}

}

0 comments on commit a8ecb36

Please sign in to comment.