From ab077cf180a6a99684df2616fddf3237b5ccc3d2 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Mon, 4 Mar 2024 17:56:57 -0300 Subject: [PATCH] feat: improve parallel processing --- .../lib/core/AbstractAmazonSnsProducer.java | 32 +++-- .../lib/core/AbstractAmazonSnsTemplate.java | 1 + .../messaging/lib/core/ListenableFuture.java | 2 +- .../messaging/lib/core/AmazonSnsConsumer.java | 3 - .../core/jmh/AmazonSnsTemplateBenchmark.java | 127 ++++++++++++++++++ .../jmh/AmazonSnsTemplateBenchmarkRunner.java | 25 ++++ pom.xml | 17 +++ 7 files changed, 194 insertions(+), 13 deletions(-) create mode 100644 amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java create mode 100644 amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java index 0ee7162..1cac9e9 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java @@ -16,10 +16,15 @@ package com.amazon.sns.messaging.lib.core; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; @@ -33,6 +38,8 @@ @RequiredArgsConstructor(access = AccessLevel.PROTECTED) abstract class AbstractAmazonSnsProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class); + private final ConcurrentMap pendingRequests; private final BlockingQueue> topicRequests; @@ -41,20 +48,27 @@ abstract class AbstractAmazonSnsProducer { @SneakyThrows public ListenableFuture send(final RequestEntry requestEntry) { - final ListenableFuture trackPendingRequest = trackPendingRequest(requestEntry.getId()); - CompletableFuture.runAsync(() -> enqueueRequest(requestEntry), executorService); - return trackPendingRequest; + return CompletableFuture.supplyAsync(() -> enqueueRequest(requestEntry), executorService).get(); } @SneakyThrows - private void enqueueRequest(final RequestEntry requestEntry) { - topicRequests.put(requestEntry); + public void shutdown() { + LOGGER.warn("Shutdown producer {}", getClass().getSimpleName()); + + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOGGER.warn("Executor service did not terminate in the specified time."); + final List droppedTasks = executorService.shutdownNow(); + LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size()); + } } - private ListenableFuture trackPendingRequest(final String correlationId) { - final ListenableFutureRegistry listenableFuture = new ListenableFutureRegistry(); - pendingRequests.put(correlationId, listenableFuture); - return listenableFuture; + @SneakyThrows + private ListenableFuture enqueueRequest(final RequestEntry requestEntry) { + final ListenableFutureRegistry trackPendingRequest = new ListenableFutureRegistry(); + pendingRequests.put(requestEntry.getId(), trackPendingRequest); + topicRequests.put(requestEntry); + return trackPendingRequest; } } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java index cbc7e66..1a14cb1 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java @@ -39,6 +39,7 @@ public ListenableFuture send(final Requ } public void shutdown() { + amazonSnsProducer.shutdown(); amazonSnsConsumer.shutdown(); } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java index 81562e6..3da35a2 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java @@ -24,7 +24,7 @@ public interface ListenableFuture { void addCallback(final Consumer successCallback, final Consumer failureCallback); default void addCallback(final Consumer successCallback) { - addCallback(successCallback, null); + addCallback(successCallback, result -> { }); } } diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index b786ee7..a6ecc6e 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -38,8 +38,6 @@ import com.amazonaws.services.sns.model.PublishBatchResult; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.SneakyThrows; - // @formatter:off @SuppressWarnings("java:S6204") class AmazonSnsConsumer extends AbstractAmazonSnsConsumer { @@ -98,7 +96,6 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final } @Override - @SneakyThrows protected void handleResponse(final PublishBatchResult publishBatchResult) { publishBatchResult.getSuccessful().forEach(entry -> { final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.getId()); diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java new file mode 100644 index 0000000..b56864b --- /dev/null +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.core.jmh; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import com.amazon.sns.messaging.lib.core.AmazonSnsTemplate; +import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.TopicProperty; +import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.services.sns.model.PublishBatchRequest; +import com.amazonaws.services.sns.model.PublishBatchResult; +import com.amazonaws.services.sns.model.PublishBatchResultEntry; + +// @formatter:off +@Fork(5) +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +public class AmazonSnsTemplateBenchmark { + + private AmazonSnsTemplate amazonSnsTemplate; + + @Setup + public void setup() { + final AmazonSNS amazonSNS = mock(AmazonSNS.class); + + when(amazonSNS.publishBatch(any())).thenAnswer(invocation -> { + final PublishBatchRequest request = invocation.getArgument(0, PublishBatchRequest.class); + final List resultEntries = request.getPublishBatchRequestEntries().stream() + .map(entry -> new PublishBatchResultEntry().withId(entry.getId())) + .collect(Collectors.toList()); + return new PublishBatchResult().withSuccessful(resultEntries); + }); + + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .linger(70) + .maxBatchSize(10) + .maximumPoolSize(512) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); + + amazonSnsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); + } + + @TearDown + public void tearDown() { + amazonSnsTemplate.await().join(); + amazonSnsTemplate.shutdown(); + } + + @Benchmark + @OperationsPerInvocation(1) + public void testSend_1() throws InterruptedException { + amazonSnsTemplate.send(RequestEntry.builder().withValue(1).build()); + } + + @Benchmark + @OperationsPerInvocation(10) + public void testSend_10() throws InterruptedException { + for (int i = 0; i < 10; i++) { + amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); + } + } + + @Benchmark + @OperationsPerInvocation(100) + public void testSend_100() throws InterruptedException { + for (int i = 0; i < 100; i++) { + amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); + } + } + + @Benchmark + @OperationsPerInvocation(1000) + public void testSend_1000() throws InterruptedException { + for (int i = 0; i < 1000; i++) { + amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); + } + } + + @Benchmark + @OperationsPerInvocation(10000) + public void testSend_10000() throws InterruptedException { + for (int i = 0; i < 10000; i++) { + amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); + } + } + +} +// @formatter:on diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java new file mode 100644 index 0000000..2454333 --- /dev/null +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.core.jmh; + +public class AmazonSnsTemplateBenchmarkRunner { + + public static void main(final String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } + +} diff --git a/pom.xml b/pom.xml index 1fbb238..959da19 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ 4.11.0 5.10.2 2.15.2 + 1.37 3.2.0 3.10.1 @@ -186,6 +187,17 @@ ${logback.version} test + + org.openjdk.jmh + jmh-core + ${jmh.version} + test + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + org.assertj assertj-core @@ -363,6 +375,11 @@ lombok ${lombok.version} + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} +