Skip to content

refactor: fix code semells #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
class CommandsProcessPerfTest {

private static final String COMMAND_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);
private static final CountDownLatch latch = new CountDownLatch(12 + 1);

@@ -41,18 +41,18 @@ class CommandsProcessPerfTest {
@Test
void commandShouldArrive() throws InterruptedException {
final long init_p = System.currentTimeMillis();
createMessages(messageCount);
createMessages(MESSAGE_COUNT);
final long end_p = System.currentTimeMillis() - init_p;
System.out.println("Total Publication Time: " + end_p + "ms");

latch.countDown();
final long init = System.currentTimeMillis();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

final long total = end - init;
final double microsPerMessage = ((total + 0.0) / messageCount) * 1000;
System.out.println("Message count: " + messageCount);
final double microsPerMessage = ((total + 0.0) / MESSAGE_COUNT) * 1000;
System.out.println("Message count: " + MESSAGE_COUNT);
System.out.println("Total Execution Time: " + total + "ms");
System.out.println("Microseconds per message: " + microsPerMessage + "us");
if (System.getProperty("env.ci") == null) {
@@ -82,7 +82,10 @@ public static void main(String[] args) {

@Bean
public HandlerRegistry registry() {
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
final HandlerRegistry registry = range(0, 20)
.reduce(HandlerRegistry.register(), (r, i) ->
r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class))
.block();
return registry
.handleCommand(COMMAND_NAME, this::handleSimple, DummyMessage.class);
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
class DirectGatewayPerfTest {

private static final String COMMAND_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);

@Autowired
@@ -36,17 +36,17 @@ class DirectGatewayPerfTest {

@Test
void shouldSendInOptimalTime() throws InterruptedException {
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT);
final Flux<Void> target = messages.flatMap(dummyMessageCommand ->
gateway.sendCommand(dummyMessageCommand, appName)
.doOnSuccess(aVoid -> semaphore.release()));

final long init = System.currentTimeMillis();
target.subscribe();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

assertMessageThroughput(end - init, messageCount, 200);
assertMessageThroughput(end - init, MESSAGE_COUNT, 200);
}

@Test
@@ -67,8 +67,10 @@ void shouldSendBatchInOptimalTime1Channel() throws InterruptedException {
private void shouldSendBatchInOptimalTimeNChannels(int channels) throws InterruptedException {
List<Mono<Void>> subs = new ArrayList<>(channels);
for (int i = 0; i < channels; ++i) {
final Flux<Command<DummyMessage>> messages = createMessages(messageCount / channels);
final Mono<Void> target = gateway.sendCommands(messages, appName).then().doOnSuccess(_v -> semaphore.release());
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT / channels);
final Mono<Void> target = gateway.sendCommands(messages, appName)
.then()
.doOnSuccess(_v -> semaphore.release());
subs.add(target);
}

@@ -79,7 +81,7 @@ private void shouldSendBatchInOptimalTimeNChannels(int channels) throws Interrup
final long end = System.currentTimeMillis();

final long total = end - init;
assertMessageThroughput(total, messageCount, 230);
assertMessageThroughput(total, MESSAGE_COUNT, 230);
}

private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
@@ -94,7 +96,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
}

private Flux<Command<DummyMessage>> createMessages(int count) {
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
final List<Command<DummyMessage>> commands = IntStream.range(0, count)
.mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage()))
.collect(Collectors.toList());
return Flux.fromIterable(commands);
}

Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.time.Duration;
@@ -35,28 +35,26 @@ class DynamicRegistryTest {

@Test
void shouldReceiveResponse() {
UnicastProcessor<String> result = UnicastProcessor.create();
DomainEventHandler<String> fn = message -> fromRunnable(() -> result.onNext(message.getData()));
Sinks.Many<String> result = Sinks.many().unicast().onBackpressureBuffer();
DomainEventHandler<String> fn = message -> fromRunnable(
() -> result.emitNext(message.getData(), Sinks.EmitFailureHandler.FAIL_FAST)
);

dynamicRegistry.listenEvent("test.event", fn, String.class).block();
final Publisher<Void> emit = eventBus.emit(new DomainEvent<>("test.event", "42", "Hello"));
from(emit).block();

StepVerifier.create(result.next().timeout(Duration.ofSeconds(10)))
StepVerifier.create(result.asFlux().next().timeout(Duration.ofSeconds(10)))
.expectNext("Hello")
.verifyComplete();


}


@SpringBootApplication
@EnableMessageListeners
@EnableDomainEventBus
static class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}

}
}
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
class QueryProcessPerfTest {

private static final String QUERY_NAME = "app.command.test";
private static final int messageCount = 40000;
private static final int MESSAGE_COUNT = 40000;
private static final Semaphore semaphore = new Semaphore(0);
private static final AtomicLong atomicLong = new AtomicLong(0);
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
@@ -44,19 +44,20 @@ class QueryProcessPerfTest {

@Test
void serveQueryPerformanceTest() throws InterruptedException {
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(messageCount);
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(MESSAGE_COUNT);

final long init = System.currentTimeMillis();
messages
.flatMap(dummyMessageAsyncQuery -> gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
.doOnNext(s -> semaphore.release())
.flatMap(dummyMessageAsyncQuery ->
gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
.doOnNext(s -> semaphore.release())
)
.subscribe();
semaphore.acquire(messageCount);
semaphore.acquire(MESSAGE_COUNT);
final long end = System.currentTimeMillis();

final long total = end - init;
assertMessageThroughput(total, messageCount, 200);
assertMessageThroughput(total, MESSAGE_COUNT, 200);
}

private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
@@ -72,7 +73,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro


private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count).mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage())).collect(Collectors.toList());
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
.collect(Collectors.toList());
return Flux.fromIterable(queryList);
}

@@ -87,7 +90,11 @@ public static void main(String[] args) {

@Bean
public HandlerRegistry registry() {
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
final HandlerRegistry registry = range(0, 20)
.reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand(
"app.command.name" + i, message -> Mono.empty(), Map.class
))
.block();
return registry
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.time.Duration;
@@ -36,55 +36,52 @@ class SimpleDirectCommunicationTest {
@Value("${spring.application.name}")
private String appName;

@Autowired
private UnicastProcessor<Command<Long>> listener;

private String commandId = ThreadLocalRandom.current().nextInt() + "";
private Long data = ThreadLocalRandom.current().nextLong();
private final String commandId = ThreadLocalRandom.current().nextInt() + "";
private final Long data = ThreadLocalRandom.current().nextLong();

@Test
void commandShouldArrive() {
Command<Long> command = new Command<>(COMMAND_NAME, commandId, data);
gateway.sendCommand(command, appName).subscribe();
Sinks.Many<Command<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();

StepVerifier.create(listener.next()).assertNext(cmd -> {
StepVerifier.create(listener.asFlux().next()).assertNext(cmd -> {
assertThat(cmd).extracting(Command::getCommandId, Command::getData, Command::getName)
.containsExactly(commandId, data, COMMAND_NAME);
.containsExactly(commandId, data, COMMAND_NAME);
}).verifyComplete();
}

@Test
void shouldReceiveResponse() {
final Mono<Integer> reply = gateway.requestReply(new AsyncQuery<>("double", 42), appName, Integer.class);
StepVerifier.create(reply.timeout(Duration.ofSeconds(15)))
.expectNext(42*2)
.verifyComplete();
.expectNext(42 * 2)
.verifyComplete();
}


@SpringBootApplication
@EnableDirectAsyncGateway
@EnableMessageListeners
static class App{
static class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}

@Bean
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
public HandlerRegistry registry(Sinks.Many<Command<Long>> listener) {
return HandlerRegistry.register()
.serveQuery("double", rqt -> just(rqt*2), Long.class)
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
}

@Bean
public UnicastProcessor<Command<Long>> listener() {
return UnicastProcessor.create();
public Sinks.Many<Command<Long>> listener() {
return Sinks.many().unicast().onBackpressureBuffer();
}

private DomainCommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
private DomainCommandHandler<Long> handle(Sinks.Many<Command<Long>> listener) {
return command -> {
listener.onNext(command);
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
return empty();
};
}
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.util.concurrent.ThreadLocalRandom;
@@ -30,17 +30,15 @@ class SimpleEventNotificationTest {
@Autowired
private DomainEventBus eventBus;

@Autowired
private UnicastProcessor<DomainEvent<Long>> listener;

private String eventId = ThreadLocalRandom.current().nextInt() + "";
private Long data = ThreadLocalRandom.current().nextLong();
private final String eventId = ThreadLocalRandom.current().nextInt() + "";
private final Long data = ThreadLocalRandom.current().nextLong();

@Test
void shouldReceiveEvent() throws InterruptedException {
DomainEvent<?> event = new DomainEvent<>(EVENT_NAME, eventId, data);
Sinks.Many<DomainEvent<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();
from(eventBus.emit(event)).subscribe();
StepVerifier.create(listener.take(1)).assertNext(evt ->
StepVerifier.create(listener.asFlux().take(1)).assertNext(evt ->
assertThat(evt).extracting(DomainEvent::getName, DomainEvent::getEventId, DomainEvent::getData)
.containsExactly(EVENT_NAME, eventId, data)
).verifyComplete();
@@ -56,20 +54,20 @@ public static void main(String[] args) {
}

@Bean
public HandlerRegistry registry(UnicastProcessor<DomainEvent<Long>> listener) {
public HandlerRegistry registry(Sinks.Many<DomainEvent<Long>> listener) {
return HandlerRegistry.register()
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
.listenEvent(EVENT_NAME, handle(listener), Long.class);
}

@Bean
public UnicastProcessor<DomainEvent<Long>> listener() {
return UnicastProcessor.create();
public Sinks.Many<DomainEvent<Long>> listener() {
return Sinks.many().unicast().onBackpressureBuffer();
}

private DomainEventHandler<Long> handle(UnicastProcessor<DomainEvent<Long>> listener) {
private DomainEventHandler<Long> handle(Sinks.Many<DomainEvent<Long>> listener) {
return command -> {
listener.onNext(command);
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
return empty();
};
}
Loading