Skip to content

Commit 5e093a5

Browse files
authored
chore(next): Update starters and extract common reusable code to an aditional module (#119)
* chore(next): Update starters and extract common reusable code to an additional module
1 parent 7c202c5 commit 5e093a5

File tree

46 files changed

+2664
-344
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2664
-344
lines changed

async/async-commons/async-commons.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99

1010
compileOnly 'io.projectreactor:reactor-core'
1111
api 'com.fasterxml.jackson.core:jackson-databind'
12+
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
1213
implementation 'commons-io:commons-io:2.16.1'
1314
implementation 'io.cloudevents:cloudevents-json-jackson:4.0.1'
1415

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
package org.reactivecommons.async.rabbit.config;
1+
package org.reactivecommons.async.commons;
22

33
import lombok.AccessLevel;
44
import lombok.NoArgsConstructor;
5-
import lombok.extern.log4j.Log4j2;
5+
import lombok.extern.java.Log;
66
import org.reactivecommons.async.api.DefaultCommandHandler;
77
import org.reactivecommons.async.api.HandlerRegistry;
88
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
99
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1010
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
11-
import org.reactivecommons.async.commons.HandlerResolver;
1211

1312
import java.util.Map;
1413
import java.util.concurrent.ConcurrentHashMap;
1514
import java.util.concurrent.ConcurrentMap;
15+
import java.util.logging.Level;
1616
import java.util.stream.Stream;
1717

1818
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1919

2020
@NoArgsConstructor(access = AccessLevel.PRIVATE)
21-
@Log4j2
21+
@Log
2222
public class HandlerResolverBuilder {
2323

2424
public static HandlerResolver buildResolver(String domain,
@@ -81,7 +81,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
8181
if (r.getDomainEventListeners().containsKey(domain)) {
8282
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
8383
}
84-
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
84+
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
8585
return Stream.empty();
8686
})
8787
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
@@ -102,7 +102,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
102102
if (r.getDomainEventListeners().containsKey(domain)) {
103103
return r.getDomainEventListeners().get(domain).stream();
104104
}
105-
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
105+
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
106106
return Stream.empty();
107107
})
108108
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/json/DefaultObjectMapperSupplier.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.databind.DeserializationFeature;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
56
import io.cloudevents.jackson.JsonFormat;
67

78
public class DefaultObjectMapperSupplier implements ObjectMapperSupplier {
@@ -11,7 +12,8 @@ public ObjectMapper get() {
1112
final ObjectMapper objectMapper = new ObjectMapper();
1213
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
1314
objectMapper.findAndRegisterModules();
14-
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule()); // TODO: Review if this is necessary
15+
objectMapper.registerModule(new JavaTimeModule());
16+
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule());
1517
return objectMapper;
1618
}
1719

Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
11
package org.reactivecommons.async.kafka.communications;
22

33
import lombok.AllArgsConstructor;
4-
import org.apache.kafka.clients.consumer.ConsumerConfig;
54
import reactor.core.publisher.Flux;
65
import reactor.kafka.receiver.KafkaReceiver;
76
import reactor.kafka.receiver.ReceiverOptions;
87
import reactor.kafka.receiver.ReceiverRecord;
98

109
import java.util.List;
1110

11+
import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
12+
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
13+
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
14+
1215

1316
@AllArgsConstructor
1417
public class ReactiveMessageListener {
1518
private final ReceiverOptions<String, byte[]> receiverOptions;
1619

1720
public Flux<ReceiverRecord<String, byte[]>> listen(String groupId, List<String> topics) { // Notification events
18-
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
21+
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(GROUP_ID_CONFIG, groupId);
1922
return KafkaReceiver.create(options.subscription(topics))
2023
.receive();
2124
}
2225

2326
public int getMaxConcurrency() {
24-
Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
27+
Object property = receiverOptions.consumerProperty(MAX_POLL_RECORDS_CONFIG);
2528
if (property instanceof Integer) {
2629
return (int) property;
2730
}
28-
return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
31+
return DEFAULT_MAX_POLL_RECORDS;
2932
}
3033
}

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@ public class TopologyCreator {
1919
private final AdminClient adminClient;
2020
private final KafkaCustomizations customizations;
2121
private final Map<String, Boolean> existingTopics;
22+
private final boolean checkTopics;
2223

23-
public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) {
24+
public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations, boolean checkTopics) {
2425
this.adminClient = adminClient;
2526
this.customizations = customizations;
27+
this.checkTopics = checkTopics;
2628
this.existingTopics = getTopics();
2729
}
2830

2931
@SneakyThrows
3032
public Map<String, Boolean> getTopics() {
33+
if (!checkTopics) {
34+
return Map.of();
35+
}
3136
ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS));
3237
return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true));
3338
}
@@ -68,7 +73,7 @@ protected NewTopic toNewTopic(TopicCustomization customization) {
6873
}
6974

7075
public void checkTopic(String topicName) {
71-
if (!existingTopics.containsKey(topicName)) {
76+
if (checkTopics && !existingTopics.containsKey(topicName)) {
7277
throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message.");
7378
// TODO: should refresh topics?? getTopics();
7479
}

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/ApplicationNotificationsListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public ApplicationNotificationsListener(ReactiveMessageListener receiver,
4444

4545
@Override
4646
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
47-
final RegisteredEventListener<Object, Object> handler = resolver.getEventListener(executorPath);
47+
final RegisteredEventListener<Object, Object> handler = resolver.getNotificationListener(executorPath);
4848

4949
Function<Message, Object> converter = resolveConverter(handler);
5050
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);

async/async-kafka/src/test/java/org/reactivecommons/async/kafka/communications/topology/TopologyCreatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ void shouldCreateTopics() {
5858
create.complete(null);
5959
doReturn(create).when(createTopicsResult).all();
6060
when(adminClient.createTopics(any())).thenReturn(createTopicsResult);
61-
creator = new TopologyCreator(adminClient, customizations);
61+
creator = new TopologyCreator(adminClient, customizations, true);
6262
// Act
6363
Mono<Void> flow = creator.createTopics(List.of("topic1", "topic2"));
6464
// Assert
@@ -73,7 +73,7 @@ void shouldCheckTopics() {
7373
names.complete(Set.of("topic1", "topic2"));
7474
doReturn(names).when(listTopicsResult).names();
7575
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
76-
creator = new TopologyCreator(adminClient, customizations);
76+
creator = new TopologyCreator(adminClient, customizations, true);
7777
// Act
7878
creator.checkTopic("topic1");
7979
// Assert
@@ -87,7 +87,7 @@ void shouldFailWhenCheckTopics() {
8787
names.complete(Set.of("topic1", "topic2"));
8888
doReturn(names).when(listTopicsResult).names();
8989
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
90-
creator = new TopologyCreator(adminClient, customizations);
90+
creator = new TopologyCreator(adminClient, customizations, true);
9191
// Assert
9292
assertThrows(TopicNotFoundException.class, () ->
9393
// Act

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import com.rabbitmq.client.AMQP;
44
import io.cloudevents.CloudEvent;
5-
import io.cloudevents.core.provider.EventFormatProvider;
6-
import io.cloudevents.jackson.JsonFormat;
75
import lombok.extern.java.Log;
86
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
97
import org.reactivecommons.async.commons.DiscardNotifier;
@@ -34,7 +32,6 @@
3432
import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID;
3533

3634
@Log
37-
//TODO: Organizar inferencia de tipos de la misma forma que en comandos y eventos
3835
public class ApplicationQueryListener extends GenericMessageListener {
3936
private final MessageConverter converter;
4037
private final HandlerResolver handlerResolver;

async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ApplicationCommandListenerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import static org.mockito.ArgumentMatchers.eq;
1515
import static org.mockito.ArgumentMatchers.longThat;
1616
import static org.mockito.Mockito.verify;
17-
import static reactor.core.publisher.Mono.*;
17+
import static reactor.core.publisher.Mono.error;
1818

1919
@ExtendWith(MockitoExtension.class)
2020
public class ApplicationCommandListenerTest extends ListenerReporterTestSuperClass{
@@ -34,7 +34,7 @@ void shouldSendErrorMetricToCustomErrorReporter() throws InterruptedException {
3434
final HandlerRegistry registry = HandlerRegistry.register()
3535
.handleCommand("app.command.test", m -> error(new RuntimeException("testEx")), DummyMessage.class);
3636
assertSendErrorToCustomReporter(registry, createSource(Command::getName, command));
37-
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time > 0 ), eq(false));
37+
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time >= 0 ), eq(false));
3838
}
3939

4040
@Test

docs/docs/reactive-commons/1-getting-started.md

Lines changed: 142 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
sidebar_position: 1
33
---
44

5+
# Getting Started
6+
57
import Tabs from '@theme/Tabs';
68
import TabItem from '@theme/TabItem';
79

810
<Tabs>
911
<TabItem value="rabbitmq" label="RabbitMQ" default>
10-
# Getting Started
1112

1213
This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer using Reactive
1314
Commons.
@@ -119,7 +120,146 @@ If you want to use it, you should read the [Creating a CloudEvent guide](11-crea
119120

120121
</TabItem>
121122
<TabItem value="kafka" label="Kafka">
122-
Comming soon...
123+
This quick start tutorial sets up a single node Kafka and runs the sample reactive sender and consumer using Reactive
124+
Commons.
125+
126+
## Requirements
127+
128+
You need Java JRE installed (Java 17 or later).
129+
130+
## Start Kafka
131+
132+
Start a Kafka broker on your local machine with all the defaults (e.g. port is 9092).
133+
134+
### Containerized
135+
136+
You can run it with Docker or Podman.
137+
138+
The following docker compose has a Kafka broker, a Zookeeper and a Kafka UI.
139+
140+
docker-compose.yml
141+
```yaml
142+
services:
143+
zookeeper:
144+
image: confluentinc/cp-zookeeper:7.4.1
145+
environment:
146+
ZOOKEEPER_CLIENT_PORT: 2181
147+
ZOOKEEPER_TICK_TIME: 2000
148+
ports:
149+
- "2181:2181"
150+
151+
kafka:
152+
image: confluentinc/cp-kafka:7.4.1
153+
environment:
154+
KAFKA_BROKER_ID: 1
155+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
156+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
157+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
158+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
159+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
160+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
161+
ports:
162+
- "9092:9092"
163+
depends_on:
164+
- zookeeper
165+
166+
kafka-ui:
167+
image: provectuslabs/kafka-ui:latest
168+
environment:
169+
KAFKA_CLUSTERS_0_NAME: local
170+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
171+
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
172+
ports:
173+
- "8081:8080"
174+
depends_on:
175+
- kafka
176+
```
177+
178+
```shell
179+
docker-compose up
180+
```
181+
182+
You may set in /etc/hosts (or equivalent) the following entry:
183+
184+
```txt
185+
127.0.0.1 kafka
186+
```
187+
188+
To enter the Kafka UI, open your browser and go to `http://localhost:8081`
189+
190+
## Spring Boot Application
191+
192+
The Spring Boot sample publishes and consumes messages with the `DomainEventBus`. This application illustrates how to
193+
configure Reactive Commons using RabbitMQ in a Spring Boot environment.
194+
195+
To build your own application using the Reactive Commons API, you need to include a dependency to Reactive Commons.
196+
197+
### Current version
198+
199+
![Maven metadata URL](https://img.shields.io/maven-metadata/v?metadataUrl=https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Forg%2Freactivecommons%2Fasync-commons-rabbit-starter%2Fmaven-metadata.xml)
200+
201+
### Dependency
202+
203+
```groovy
204+
dependencies {
205+
implementation "org.reactivecommons:async-kafka-starter:<version>"
206+
}
207+
```
208+
209+
### Configuration properties
210+
211+
Also you need to include the name for your app in the `application.properties`, it is important because this value will
212+
be used
213+
to name the application queues inside RabbitMQ:
214+
215+
```properties
216+
spring.application.name=MyAppName
217+
```
218+
219+
Or in your `application.yaml`
220+
221+
```yaml
222+
spring:
223+
application:
224+
name: MyAppName
225+
```
226+
227+
You can set the RabbitMQ connection properties through spring boot with
228+
the [`spring.kafka.*` properties](https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html)
229+
230+
```yaml
231+
spring:
232+
kafka:
233+
bootstrap-servers: localhost:9092
234+
```
235+
236+
You can also set it in runtime for example from a secret, so you can create the `KafkaProperties` bean like:
237+
238+
```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"
239+
240+
@Configuration
241+
public class MyKafkaConfig {
242+
243+
@Bean
244+
@Primary
245+
public KafkaProperties myRCKafkaProperties() {
246+
KafkaProperties properties = new KafkaProperties();
247+
properties.setBootstrapServers(List.of("localhost:9092"));
248+
return properties;
249+
}
250+
}
251+
```
252+
253+
### Multi Broker Instances of Kafka or Multi Domain support
254+
255+
Enables to you the ability to listen events from different domains.
256+
257+
### Cloud Events
258+
259+
Includes the Cloud Events specification.
260+
261+
If you want to use it, you should read the [Creating a CloudEvent guide](11-creating-a-cloud-event.md)
262+
123263
</TabItem>
124264
</Tabs>
125265

0 commit comments

Comments
 (0)