Skip to content

Commit

Permalink
[SDCISA-16147, swisspost#583] Quick-n-dirty just waste anothers threa…
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 4, 2024
1 parent acbcdac commit ae29409
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
Expand All @@ -12,6 +13,7 @@
import org.slf4j.LoggerFactory;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceConsumer;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.core.http.RequestLoggerFactory;
import org.swisspush.gateleen.core.util.ResponseStatusCodeLogUtil;
import org.swisspush.gateleen.core.util.StatusCode;
Expand All @@ -25,6 +27,8 @@
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -42,45 +46,78 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private final String streamingPath;
private final GateleenExceptionFactory exceptionFactory;
private final KafkaProducerRepository repository;
private final KafkaTopicExtractor topicExtractor;
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

/** Use {@link #builder()} to get an instance. */
KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
ConfigurationResourceManager configurationResourceManager,
KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender,
String configResourceUri,
String streamingPath,
Map<String, Object> properties
) {
super(configurationResourceManager, configResourceUri, "gateleen_kafka_topic_configuration_schema");
this.exceptionFactory = exceptionFactory;
this.repository = repository;
this.kafkaMessageValidator = kafkaMessageValidator;
this.kafkaMessageSender = kafkaMessageSender;
this.streamingPath = streamingPath;
this.properties = properties;

this.topicExtractor = new KafkaTopicExtractor(streamingPath);
this.kafkaProducerRecordBuilder = new KafkaProducerRecordBuilder(vertx, exceptionFactory);
}

public static KafkaHandlerBuilder builder() {
return new KafkaHandlerBuilder();
}

public Future<Void> initialize() {
Expand Down Expand Up @@ -140,9 +177,11 @@ public boolean handle(final HttpServerRequest request) {
}

request.bodyHandler(payload -> {
try {
log.debug("incoming kafka message payload: {}", payload);
final List<KafkaProducerRecord<String, String>> kafkaProducerRecords = KafkaProducerRecordBuilder.buildRecords(topic, payload);
log.debug("incoming kafka message payload: {}", payload);
// TODO refactor away this callback-hell (Counts for the COMPLETE method
// surrounding this line, named 'KafkaHandler.handle()', NOT only
// those lines below).
kafkaProducerRecordBuilder.buildRecords(topic, payload).compose((List<KafkaProducerRecord<String, String>> kafkaProducerRecords) -> {
maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
Expand All @@ -162,9 +201,14 @@ public boolean handle(final HttpServerRequest request) {
respondWith(StatusCode.INTERNAL_SERVER_ERROR, validationEvent.cause().getMessage(), request);
}
});
} catch (ValidationException ve){
respondWith(StatusCode.BAD_REQUEST, ve.getMessage(), request);
}
return null;
}).onFailure((Throwable ex) -> {
if (ex instanceof ValidationException) {
respondWith(StatusCode.BAD_REQUEST, ex.getMessage(), request);
return;
}
log.error("TODO error handling", exceptionFactory.newException(ex));
});
});
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.configuration.ConfigurationResourceManager;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;

import java.util.Map;

import static org.slf4j.LoggerFactory.getLogger;
import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

public class KafkaHandlerBuilder {

private static final Logger log = getLogger(KafkaHandlerBuilder.class);
private Vertx vertx;
private GateleenExceptionFactory exceptionFactory;
private ConfigurationResourceManager configurationResourceManager;
private KafkaMessageValidator kafkaMessageValidator;
private KafkaProducerRepository repository;
private KafkaMessageSender kafkaMessageSender;
private String configResourceUri;
private String streamingPath;
private Map<String, Object> properties;

/** Use {@link KafkaHandler#builder()} */
KafkaHandlerBuilder() {/**/}

public KafkaHandler build() {
if (vertx == null) throw new NullPointerException("vertx missing");
if (exceptionFactory == null) exceptionFactory = newGateleenThriftyExceptionFactory();
if (repository == null) throw new NullPointerException("kafkaProducerRepository missing");
if (kafkaMessageSender == null) throw new NullPointerException("kafkaMessageSender missing");
if (streamingPath == null) log.warn("no 'streamingPath' given. Are you sure you want none?");
return new KafkaHandler(
vertx, exceptionFactory, configurationResourceManager, kafkaMessageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, properties);
}

public KafkaHandlerBuilder withVertx(Vertx vertx) {
this.vertx = vertx;
return this;
}

public KafkaHandlerBuilder withExceptionFactory(GateleenExceptionFactory exceptionFactory) {
this.exceptionFactory = exceptionFactory;
return this;
}

public KafkaHandlerBuilder withConfigurationResourceManager(ConfigurationResourceManager configurationResourceManager) {
this.configurationResourceManager = configurationResourceManager;
return this;
}

public KafkaHandlerBuilder withKafkaMessageValidator(KafkaMessageValidator kafkaMessageValidator) {
this.kafkaMessageValidator = kafkaMessageValidator;
return this;
}

public KafkaHandlerBuilder withRepository(KafkaProducerRepository repository) {
this.repository = repository;
return this;
}

public KafkaHandlerBuilder withKafkaMessageSender(KafkaMessageSender kafkaMessageSender) {
this.kafkaMessageSender = kafkaMessageSender;
return this;
}

public KafkaHandlerBuilder withConfigResourceUri(String configResourceUri) {
this.configResourceUri = configResourceUri;
return this;
}

public KafkaHandlerBuilder withStreamingPath(String streamingPath) {
this.streamingPath = streamingPath;
return this;
}

public KafkaHandlerBuilder withProperties(Map<String, Object> properties) {
this.properties = properties;
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package org.swisspush.gateleen.kafka;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.slf4j.Logger;
import org.swisspush.gateleen.core.exception.GateleenExceptionFactory;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.ArrayList;
import java.util.List;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static org.slf4j.LoggerFactory.getLogger;

/**
* Creates {@link KafkaProducerRecord}s by parsing the request payload.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
class KafkaProducerRecordBuilder {

private static final Logger log = getLogger(KafkaProducerRecordBuilder.class);
private static final String RECORDS = "records";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final String HEADERS = "headers";
private final Vertx vertx;
private final GateleenExceptionFactory exceptionFactory;

KafkaProducerRecordBuilder(
Vertx vertx,
GateleenExceptionFactory exceptionFactory
) {
this.vertx = vertx;
this.exceptionFactory = exceptionFactory;
}

/**
* Builds a list of {@link KafkaProducerRecord}s based on the provided payload.
Expand All @@ -32,7 +51,40 @@ class KafkaProducerRecordBuilder {
* @return A list of {@link KafkaProducerRecord}s created from the provided payload
* @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.)
*/
static List<KafkaProducerRecord<String, String>> buildRecords(String topic, Buffer payload) throws ValidationException {
Future<List<KafkaProducerRecord<String, String>>> buildRecords(String topic, Buffer payload) {
return Future.<Void>succeededFuture().compose((Void v) -> {
JsonObject payloadObj;
try {
payloadObj = new JsonObject(payload);
} catch (DecodeException de) {
return Future.failedFuture(new ValidationException("Error while parsing payload", de));
}
JsonArray recordsArray;
try {
recordsArray = payloadObj.getJsonArray(RECORDS);
} catch (ClassCastException cce) {
return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects"));
}
if (recordsArray == null) {
return Future.failedFuture(new ValidationException("Missing 'records' array"));
}
return vertx.executeBlocking(() -> {
assert !currentThread().getName().toUpperCase().contains("EVENTLOOP") : currentThread().getName();
long beginEpchMs = currentTimeMillis();
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>(recordsArray.size());
for (int i = 0; i < recordsArray.size(); i++) {
kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i)));
}
long durationMs = currentTimeMillis() - beginEpchMs;
log.debug("Serializing JSON did block thread for {}ms", durationMs);
return kafkaProducerRecords;
});
});
}

/** @deprecated Use {@link #buildRecords(String, Buffer)}. */
@Deprecated
static List<KafkaProducerRecord<String, String>> buildRecordsBlocking(String topic, Buffer payload) throws ValidationException {
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>();
JsonObject payloadObj;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking;

/**
* Test class for the {@link KafkaMessageSender}
Expand All @@ -47,7 +47,7 @@ public void sendSingleMessage(TestContext context) throws ValidationException {
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecords(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode()));
buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -64,7 +64,7 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecords(topic, Buffer.buffer(buildSingleRecordPayload(null).encode()));
buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload(null).encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -81,7 +81,7 @@ public void sendMultipleMessages(TestContext context) throws ValidationException
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));
buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -105,7 +105,7 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));
buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));
when(producer.send(eq(records.get(1)))).thenReturn(Future.failedFuture("Message with key '" + records.get(1).key() + "' failed."));
Expand Down
Loading

0 comments on commit ae29409

Please sign in to comment.