Skip to content

chore(kafka): Implement kafka binding with support for Domain Events #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f7ab871
feat(cloudEvents): Add CloudEvent specification to commands and event…
adsanta Jul 24, 2024
e2658a8
adjust unit test due new implementation supporting cloud events
jcanacon Jul 24, 2024
3d6b41b
add single class imports in handler registry test
jcanacon Jul 24, 2024
1d3d9c6
Merge pull request #1 from adsanta/feature/cloudevent-spec-beta-test-1
jcanacon Jul 24, 2024
8fb1eb2
feat(kafka): start kafka binding implementation
juancgalvis Jul 25, 2024
f23274d
feat(kafka): start kafka binding implementation
juancgalvis Jul 25, 2024
abc2b25
feat(cloudEvents): Change interfaces signatures
adsanta Jul 26, 2024
0c1c826
feat(cloudEvents): Add CloudEvents dlq
adsanta Jul 31, 2024
11067d3
Update with master
adsanta Jul 31, 2024
fdb037a
feat(cloudEvents): Add coverage
adsanta Jul 31, 2024
8d7d54c
feat(kafka): Add local restries and emit to dlq topic
juancgalvis Jul 31, 2024
ab36961
feat(kafka): Merge api update
juancgalvis Jul 31, 2024
9435690
fix(shared): and add some refactor of generic classes
juancgalvis Aug 1, 2024
1cb1086
fix(shared): some build errors
juancgalvis Aug 1, 2024
bb13aff
fix(query): fix query definition in handler registry
juancgalvis Aug 6, 2024
1aa0d4f
feat(kafka): Merge api update
juancgalvis Aug 6, 2024
73c9ce5
merge
juancgalvis Aug 6, 2024
2f0ea58
feat(kafka): Add some unit tests
juancgalvis Aug 6, 2024
b6812ae
merge
juancgalvis Aug 9, 2024
99096bd
feat(kafka): starter
juancgalvis Aug 9, 2024
a83cebe
feat(kafka): Add sample and starter config
juancgalvis Aug 9, 2024
8ef72c8
test(listener): Add some unit tests
juancgalvis Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,5 @@ MigrationBackup/
# End of https://www.toptal.com/developers/gitignore/api/macos,linux,windows,gradle,java,intellij,visualstudio,eclipse
contiperf-report

samples/async/local-example/
samples/async/local-example/
.kafka-env
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import org.junit.jupiter.api.Test;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.handlers.*;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.QueryHandler;
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
Expand Down Expand Up @@ -152,7 +157,7 @@ void handleDomainCommand() {

@Test
void handleCloudEventCommand() {
SomeCloudEventCommandHandler cloudCommandHandler = new SomeCloudEventCommandHandler();
SomeCloudCommandHandler cloudCommandHandler = new SomeCloudCommandHandler();

registry.handleCloudEventCommand(name, cloudCommandHandler);

Expand Down Expand Up @@ -197,7 +202,7 @@ void serveQueryWithLambda() {
@Test
void serveQueryWithTypeInference() {
QueryHandler<SomeDataClass, SomeDataClass> handler = new SomeQueryHandler();
registry.serveQuery(name, handler,SomeDataClass.class);
registry.serveQuery(name, handler, SomeDataClass.class);
assertThat(registry.getHandlers()).anySatisfy(registered -> {
assertThat(registered).extracting(RegisteredQueryHandler::getPath, RegisteredQueryHandler::getQueryClass)
.containsExactly(name, SomeDataClass.class);
Expand Down Expand Up @@ -252,7 +257,7 @@ public Mono<Void> handle(Command<SomeDataClass> message) {
}
}

private static class SomeCloudEventCommandHandler implements CloudCommandHandler {
private static class SomeCloudCommandHandler implements CloudCommandHandler {
@Override
public Mono<Void> handle(CloudEvent message) {
return null;
Expand Down
12 changes: 12 additions & 0 deletions async/async-kafka/async-kafka.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ext {
artifactId = 'async-kafka'
artifactDescription = 'Async Kafka'
}

dependencies {
api project(':async-commons-api')
api project(':domain-events-api')
api project(':async-commons')
api 'io.projectreactor.kafka:reactor-kafka:1.3.23'
api 'io.cloudevents:cloudevents-json-jackson:4.0.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import lombok.AllArgsConstructor;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;

@AllArgsConstructor
public class KafkaDomainEventBus implements DomainEventBus {
private final ReactiveMessageSender sender;

@Override
public <T> Publisher<Void> emit(DomainEvent<T> event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(CloudEvent event) {
return sender.send(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.reactivecommons.async.kafka;

import lombok.Data;
import org.apache.kafka.common.header.Headers;
import org.reactivecommons.async.commons.communications.Message;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.HashMap;
import java.util.Map;

import static org.reactivecommons.async.kafka.converters.json.KafkaJacksonMessageConverter.CONTENT_TYPE;


@Data
public class KafkaMessage implements Message {
private final byte[] body;
private final Properties properties;

@Data
public static class KafkaMessageProperties implements Properties {
private long contentLength;
private String key;
private String topic;
private Map<String, Object> headers = new HashMap<>();

@Override
public String getContentType() {
return (String) headers.get(CONTENT_TYPE);
}
}

public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> record) {
return new KafkaMessage(record.value(), createMessageProps(record));
}

private static Properties createMessageProps(ReceiverRecord<String, byte[]> record) {
Map<String, Object> headers = parseHeaders(record.headers());

final KafkaMessageProperties properties = new KafkaMessageProperties();
properties.setHeaders(headers);
properties.setKey(record.key());
properties.setTopic(record.topic());
properties.setContentLength(record.value().length);
return properties;
}

private static Map<String, Object> parseHeaders(Headers headers) {
Map<String, Object> parsedHeaders = new HashMap<>();
headers.forEach(header -> parsedHeaders.put(header.key(), new String(header.value())));
return parsedHeaders;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.reactivecommons.async.kafka.communications;

import lombok.AllArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.List;


@AllArgsConstructor
public class ReactiveMessageListener {
private final ReceiverOptions<String, byte[]> receiverOptions;

public Flux<ReceiverRecord<String, byte[]>> listen(String groupId, List<String> topics) { // Notification events
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return KafkaReceiver.create(options.subscription(topics))
.receive();
}

public int getMaxConcurrency() {
Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
if (property instanceof Integer) {
return (int) property;
}
return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.reactivecommons.async.kafka.communications;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.kafka.KafkaMessage;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class ReactiveMessageSender {
private final ConcurrentHashMap<String, MonoSink<Void>> confirmations = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<FluxSink<SenderRecord<String, byte[], String>>> fluxSinks = new CopyOnWriteArrayList<>();
private final AtomicLong counter = new AtomicLong();

private final ExecutorService executorServiceConfirm = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender1-" + counter.getAndIncrement()));
private final ExecutorService executorServiceEmit = Executors.newFixedThreadPool(13, r -> new Thread(r, "KMessageSender2-" + counter.getAndIncrement()));

private final int senderCount = 4;

private final MessageConverter messageConverter;
private final TopologyCreator topologyCreator;

public ReactiveMessageSender(KafkaSender<String, byte[]> sender, MessageConverter messageConverter,
TopologyCreator topologyCreator) {
this.messageConverter = messageConverter;
this.topologyCreator = topologyCreator;
for (int i = 0; i < senderCount; ++i) {
Flux<SenderRecord<String, byte[], String>> source = Flux.create(fluxSinks::add);
sender.send(source)
.doOnNext(this::confirm)
.subscribe();
}
}

public <V> Mono<Void> send(V message) {
return Mono.create(sink -> {
SenderRecord<String, byte[], String> record = createRecord(message);
confirmations.put(record.key(), sink);
executorServiceEmit.submit(() -> fluxSinks.get((int) (System.currentTimeMillis() % senderCount)).next(record));
});
}

private void confirm(SenderResult<String> result) {
executorServiceConfirm.submit(() -> {
MonoSink<Void> sink = confirmations.remove(result.correlationMetadata());
if (sink != null) {
if (result.exception() != null) {
sink.error(result.exception());
} else {
sink.success();
}
}
});
}

private <V> SenderRecord<String, byte[], String> createRecord(V object) {
KafkaMessage message = (KafkaMessage) messageConverter.toMessage(object);
ProducerRecord<String, byte[]> record = createProducerRecord(message);
return SenderRecord.create(record, message.getProperties().getKey()); // TODO: Review for Request-Reply
}

@SneakyThrows
private ProducerRecord<String, byte[]> createProducerRecord(KafkaMessage message) {
topologyCreator.checkTopic(message.getProperties().getTopic());

List<Header> headers = message.getProperties().getHeaders().entrySet().stream()
.map(entry -> new RecordHeader(entry.getKey(), entry.getValue()
.toString().getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList());

return new ProducerRecord<>(message.getProperties().getTopic(), null,
message.getProperties().getKey(), message.getBody(), headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.reactivecommons.async.kafka.communications.exceptions;

public class TopicNotFoundException extends RuntimeException {
public TopicNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class KafkaCustomizations {
private Map<String, TopicCustomization> topics = new HashMap<>();

public static KafkaCustomizations withTopic(String topic, TopicCustomization customization) {
KafkaCustomizations customizations = new KafkaCustomizations();
customizations.getTopics().put(topic, customization);
return customizations;
}

public KafkaCustomizations addTopic(String topic, TopicCustomization customization) {
this.getTopics().put(topic, customization);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TopicCustomization {
private String topic;
private int partitions;
private short replicationFactor;
private Map<String, String> config;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.reactivecommons.async.kafka.communications.topology;

import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.reactivecommons.async.kafka.communications.exceptions.TopicNotFoundException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TopologyCreator {
public static final int TIMEOUT_MS = 60_000;
private final AdminClient adminClient;
private final KafkaCustomizations customizations;
private final Map<String, Boolean> existingTopics;

public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) {
this.adminClient = adminClient;
this.customizations = customizations;
this.existingTopics = getTopics();
}

@SneakyThrows
public Map<String, Boolean> getTopics() {
ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS));
return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true));
}

public Mono<Void> createTopics(List<String> topics) {
TopicCustomization.TopicCustomizationBuilder defaultBuilder = TopicCustomization.builder()
.partitions(-1)
.replicationFactor((short) -1);

return Flux.fromIterable(topics)
.map(topic -> {
if (customizations.getTopics().containsKey(topic)) {
return customizations.getTopics().get(topic);
}
return defaultBuilder.topic(topic).build();
})
.map(this::toNewTopic)
.flatMap(this::createTopic)
.doOnNext(topic -> existingTopics.put(topic.name(), true))
.then();
}

protected Mono<NewTopic> createTopic(NewTopic topic) {
return Mono.fromFuture(adminClient.createTopics(List.of(topic))
.all()
.toCompletionStage()
.toCompletableFuture())
.thenReturn(topic)
.onErrorResume(TopicExistsException.class, e -> Mono.just(topic));
}

protected NewTopic toNewTopic(TopicCustomization customization) {
NewTopic topic = new NewTopic(customization.getTopic(), customization.getPartitions(), customization.getReplicationFactor());
if (customization.getConfig() != null) {
return topic.configs(customization.getConfig());
}
return topic;
}

public void checkTopic(String topicName) {
if (!existingTopics.containsKey(topicName)) {
throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message.");
// TODO: should refresh topics?? getTopics();
}
}
}
Loading