diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index a2345d9f..e50806a0 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -2,6 +2,7 @@ import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -12,6 +13,7 @@ import org.slf4j.LoggerFactory; import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer; import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.core.http.RequestLoggerFactory; import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil; import org.swisspush.gateleen.core.util.StatusCode; @@ -25,6 +27,8 @@ import java.util.Optional; import java.util.regex.Pattern; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + /** * Handler class for all Kafka related requests. * @@ -42,20 +46,26 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private final Logger log = LoggerFactory.getLogger(KafkaHandler.class); private final String streamingPath; + private final GateleenExceptionFactory exceptionFactory; private final KafkaProducerRepository repository; private final KafkaTopicExtractor topicExtractor; private final KafkaMessageSender kafkaMessageSender; private final Map properties; + private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder; private KafkaMessageValidator kafkaMessageValidator; private boolean initialized = false; + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { this(configurationResourceManager, null, repository, kafkaMessageSender, configResourceUri, streamingPath); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { @@ -63,6 +73,8 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, new HashMap<>()); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { @@ -70,10 +82,30 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K configResourceUri, streamingPath, properties); } + /** @deprecated Use {@link #builder()} */ + @Deprecated public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { + this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, + kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, + properties); + log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); + } + /** Use {@link #builder()} to get an instance. */ + KafkaHandler( + Vertx vertx, + GateleenExceptionFactory exceptionFactory, + ConfigurationResourceManager configurationResourceManager, + KafkaMessageValidator kafkaMessageValidator, + KafkaProducerRepository repository, + KafkaMessageSender kafkaMessageSender, + String configResourceUri, + String streamingPath, + Map properties + ) { super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema"); + this.exceptionFactory = exceptionFactory; this.repository = repository; this.kafkaMessageValidator = kafkaMessageValidator; this.kafkaMessageSender = kafkaMessageSender; @@ -81,6 +113,11 @@ public KafkaHandler(ConfigurationResourceManager configurationResourceManager, K this.properties = properties; this.topicExtractor = new KafkaTopicExtractor(streamingPath); + this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory); + } + + public static KafkaHandlerBuilder builder() { + return new KafkaHandlerBuilder(); } public Future initialize() { @@ -140,9 +177,11 @@ public boolean handle(final HttpServerRequest request) { } request.bodyHandler(payload -> { - try { - log.debug("incoming kafka message payload: {}", payload); - final List> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload); + log.debug("incoming kafka message payload: {}", payload); + // TODO refactor away this callback-hell (Counts for the COMPLETE method + // surrounding this line, named 'KafkaHandler.handle()', NOT only + // those lines below). + kafkaProducerRecordBuilder.buildRecords(topic, payload).compose((List> kafkaProducerRecords) -> { maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> { if(validationEvent.succeeded()) { if(validationEvent.result().isSuccess()) { @@ -162,9 +201,14 @@ public boolean handle(final HttpServerRequest request) { respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request); } }); - } catch (ValidationException ve){ - respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request); - } + return null; + }).onFailure((Throwable ex) -> { + if (ex instanceof ValidationException) { + respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request); + return; + } + log.error("TODO error handling", exceptionFactory.newException(ex)); + }); }); return true; } diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java new file mode 100644 index 00000000..324e2354 --- /dev/null +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandlerBuilder.java @@ -0,0 +1,85 @@ +package org.swisspush.gateleen.kafka; + +import io.vertx.core.Vertx; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; + +import java.util.Map; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; + +public class KafkaHandlerBuilder { + + private static final Logger log = getLogger(KafkaHandlerBuilder.class); + private Vertx vertx; + private GateleenExceptionFactory exceptionFactory; + private ConfigurationResourceManager configurationResourceManager; + private KafkaMessageValidator kafkaMessageValidator; + private KafkaProducerRepository repository; + private KafkaMessageSender kafkaMessageSender; + private String configResourceUri; + private String streamingPath; + private Map properties; + + /** Use {@link KafkaHandler#builder()} */ + KafkaHandlerBuilder() {/**/} + + public KafkaHandler build() { + if (vertx == null) throw new NullPointerException("vertx missing"); + if (exceptionFactory == null) exceptionFactory = newGateleenThriftyExceptionFactory(); + if (repository == null) throw new NullPointerException("kafkaProducerRepository missing"); + if (kafkaMessageSender == null) throw new NullPointerException("kafkaMessageSender missing"); + if (streamingPath == null) log.warn("no 'streamingPath' given. Are you sure you want none?"); + return new KafkaHandler( + vertx, exceptionFactory, configurationResourceManager, kafkaMessageValidator, repository, + kafkaMessageSender, configResourceUri, streamingPath, properties); + } + + public KafkaHandlerBuilder withVertx(Vertx vertx) { + this.vertx = vertx; + return this; + } + + public KafkaHandlerBuilder withExceptionFactory(GateleenExceptionFactory exceptionFactory) { + this.exceptionFactory = exceptionFactory; + return this; + } + + public KafkaHandlerBuilder withConfigurationResourceManager(ConfigurationResourceManager configurationResourceManager) { + this.configurationResourceManager = configurationResourceManager; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageValidator(KafkaMessageValidator kafkaMessageValidator) { + this.kafkaMessageValidator = kafkaMessageValidator; + return this; + } + + public KafkaHandlerBuilder withRepository(KafkaProducerRepository repository) { + this.repository = repository; + return this; + } + + public KafkaHandlerBuilder withKafkaMessageSender(KafkaMessageSender kafkaMessageSender) { + this.kafkaMessageSender = kafkaMessageSender; + return this; + } + + public KafkaHandlerBuilder withConfigResourceUri(String configResourceUri) { + this.configResourceUri = configResourceUri; + return this; + } + + public KafkaHandlerBuilder withStreamingPath(String streamingPath) { + this.streamingPath = streamingPath; + return this; + } + + public KafkaHandlerBuilder withProperties(Map properties) { + this.properties = properties; + return this; + } + +} diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java index 83b3bf62..9c2a7ddb 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java @@ -1,15 +1,23 @@ package org.swisspush.gateleen.kafka; +import io.vertx.core.Future; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.DecodeException; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaProducerRecord; +import org.slf4j.Logger; +import org.swisspush.gateleen.core.exception.GateleenExceptionFactory; import org.swisspush.gateleen.validation.ValidationException; import java.util.ArrayList; import java.util.List; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.currentThread; +import static org.slf4j.LoggerFactory.getLogger; + /** * Creates {@link KafkaProducerRecord}s by parsing the request payload. * @@ -17,10 +25,21 @@ */ class KafkaProducerRecordBuilder { + private static final Logger log = getLogger(KafkaProducerRecordBuilder.class); private static final String RECORDS = "records"; private static final String KEY = "key"; private static final String VALUE = "value"; private static final String HEADERS = "headers"; + private final Vertx vertx; + private final GateleenExceptionFactory exceptionFactory; + + KafkaProducerRecordBuilder( + Vertx vertx, + GateleenExceptionFactory exceptionFactory + ) { + this.vertx = vertx; + this.exceptionFactory = exceptionFactory; + } /** * Builds a list of {@link KafkaProducerRecord}s based on the provided payload. @@ -32,7 +51,40 @@ class KafkaProducerRecordBuilder { * @return A list of {@link KafkaProducerRecord}s created from the provided payload * @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.) */ - static List> buildRecords(String topic, Buffer payload) throws ValidationException { + Future>> buildRecords(String topic, Buffer payload) { + return Future.succeededFuture().compose((Void v) -> { + JsonObject payloadObj; + try { + payloadObj = new JsonObject(payload); + } catch (DecodeException de) { + return Future.failedFuture(new ValidationException("Error while parsing payload", de)); + } + JsonArray recordsArray; + try { + recordsArray = payloadObj.getJsonArray(RECORDS); + } catch (ClassCastException cce) { + return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects")); + } + if (recordsArray == null) { + return Future.failedFuture(new ValidationException("Missing 'records' array")); + } + return vertx.executeBlocking(() -> { + assert !currentThread().getName().toUpperCase().contains("EVENTLOOP") : currentThread().getName(); + long beginEpchMs = currentTimeMillis(); + List> kafkaProducerRecords = new ArrayList<>(recordsArray.size()); + for (int i = 0; i < recordsArray.size(); i++) { + kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i))); + } + long durationMs = currentTimeMillis() - beginEpchMs; + log.debug("Serializing JSON did block thread for {}ms", durationMs); + return kafkaProducerRecords; + }); + }); + } + + /** @deprecated Use {@link #buildRecords(String, Buffer)}. */ + @Deprecated + static List> buildRecordsBlocking(String topic, Buffer payload) throws ValidationException { List> kafkaProducerRecords = new ArrayList<>(); JsonObject payloadObj; try { diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java index e2ba6751..4dc44624 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java @@ -22,7 +22,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords; +import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking; /** * Test class for the {@link KafkaMessageSender} @@ -47,7 +47,7 @@ public void sendSingleMessage(TestContext context) throws ValidationException { Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecords(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode())); + buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -64,7 +64,7 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecords(topic, Buffer.buffer(buildSingleRecordPayload(null).encode())); + buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload(null).encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -81,7 +81,7 @@ public void sendMultipleMessages(TestContext context) throws ValidationException Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); + buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); @@ -105,7 +105,7 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V Async async = context.async(); String topic = "myTopic"; final List> records = - buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); + buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode())); when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic))); when(producer.send(eq(records.get(1)))).thenReturn(Future.failedFuture("Message with key '" + records.get(1).key() + "' failed.")); diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java index 3cb7a63c..0c03ec09 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java @@ -17,7 +17,7 @@ import java.util.List; -import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords; +import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking; /** * Test class for the {@link KafkaProducerRecordBuilder} @@ -34,34 +34,34 @@ public class KafkaProducerRecordBuilderTest { public void buildRecordsInvalidJson() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Error while parsing payload"); - buildRecords("myTopic", Buffer.buffer("notValidJson")); + buildRecordsBlocking("myTopic", Buffer.buffer("notValidJson")); } @Test public void buildRecordsMissingRecordsArray() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Missing 'records' array"); - buildRecords("myTopic", Buffer.buffer("{}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{}")); } @Test public void buildRecordsNotArray() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}")); } @Test public void buildRecordsInvalidRecordsType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [123]}")); } @Test public void buildRecordsEmptyRecordsArray(TestContext context) throws ValidationException { final List> records = - buildRecords("myTopic", Buffer.buffer("{\"records\": []}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": []}")); context.assertTrue(records.isEmpty()); } @@ -69,42 +69,42 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation public void buildRecordsInvalidKeyType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'key' must be of type String"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}")); } @Test public void buildRecordsInvalidValueType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'value' must be of type JsonObject"); - buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}")); } @Test public void buildRecordsMissingValue() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'value' is required"); - buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{}]}")); } @Test public void buildRecordsInvalidHeadersType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'headers' must be of type JsonObject"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}")); } @Test public void buildRecordsInvalidHeadersValueType() throws ValidationException { thrown.expect( ValidationException.class ); thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only"); - buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); + buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}")); } @Test public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws ValidationException { JsonObject payload = buildPayload(null, null); final List> records = - buildRecords("myTopic", Buffer.buffer(payload.encode())); + buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic()); @@ -118,7 +118,7 @@ public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws Validati public void buildRecordsValidWithKeyNoHeaders(TestContext context) throws ValidationException { JsonObject payload = buildPayload("someKey", null); final List> records = - buildRecords("myTopic", Buffer.buffer(payload.encode())); + buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic()); @@ -135,7 +135,7 @@ public void buildRecordsValidWithKeyWithHeaders(TestContext context) throws Vali .add("header_1", "value_1") .add("header_2", "value_2")); final List> records = - buildRecords("myTopic", Buffer.buffer(payload.encode())); + buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode())); context.assertFalse(records.isEmpty()); context.assertEquals(1, records.size()); context.assertEquals("myTopic", records.get(0).topic());