Skip to content

Commit

Permalink
feat: improve parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mvallim committed Mar 4, 2024
1 parent 1fc1e99 commit ab077cf
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.amazon.sns.messaging.lib.core;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
Expand All @@ -33,6 +38,8 @@
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class AbstractAmazonSnsProducer<E> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class);

private final ConcurrentMap<String, ListenableFutureRegistry> pendingRequests;

private final BlockingQueue<RequestEntry<E>> topicRequests;
Expand All @@ -41,20 +48,27 @@ abstract class AbstractAmazonSnsProducer<E> {

@SneakyThrows
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
final ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> trackPendingRequest = trackPendingRequest(requestEntry.getId());
CompletableFuture.runAsync(() -> enqueueRequest(requestEntry), executorService);
return trackPendingRequest;
return CompletableFuture.supplyAsync(() -> enqueueRequest(requestEntry), executorService).get();
}

@SneakyThrows
private void enqueueRequest(final RequestEntry<E> requestEntry) {
topicRequests.put(requestEntry);
public void shutdown() {
LOGGER.warn("Shutdown producer {}", getClass().getSimpleName());

executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}
}

private ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> trackPendingRequest(final String correlationId) {
final ListenableFutureRegistry listenableFuture = new ListenableFutureRegistry();
pendingRequests.put(correlationId, listenableFuture);
return listenableFuture;
@SneakyThrows
private ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> enqueueRequest(final RequestEntry<E> requestEntry) {
final ListenableFutureRegistry trackPendingRequest = new ListenableFutureRegistry();
pendingRequests.put(requestEntry.getId(), trackPendingRequest);
topicRequests.put(requestEntry);
return trackPendingRequest;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final Requ
}

public void shutdown() {
amazonSnsProducer.shutdown();
amazonSnsConsumer.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface ListenableFuture<S, F> {
void addCallback(final Consumer<? super S> successCallback, final Consumer<? super F> failureCallback);

default void addCallback(final Consumer<? super S> successCallback) {
addCallback(successCallback, null);
addCallback(successCallback, result -> { });
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import com.amazonaws.services.sns.model.PublishBatchResult;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.SneakyThrows;

// @formatter:off
@SuppressWarnings("java:S6204")
class AmazonSnsConsumer<E> extends AbstractAmazonSnsConsumer<AmazonSNS, PublishBatchRequest, PublishBatchResult, E> {
Expand Down Expand Up @@ -98,7 +96,6 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final
}

@Override
@SneakyThrows
protected void handleResponse(final PublishBatchResult publishBatchResult) {
publishBatchResult.getSuccessful().forEach(entry -> {
final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core.jmh;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

import com.amazon.sns.messaging.lib.core.AmazonSnsTemplate;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.PublishBatchRequest;
import com.amazonaws.services.sns.model.PublishBatchResult;
import com.amazonaws.services.sns.model.PublishBatchResultEntry;

// @formatter:off
@Fork(5)
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
public class AmazonSnsTemplateBenchmark {

private AmazonSnsTemplate<Integer> amazonSnsTemplate;

@Setup
public void setup() {
final AmazonSNS amazonSNS = mock(AmazonSNS.class);

when(amazonSNS.publishBatch(any())).thenAnswer(invocation -> {
final PublishBatchRequest request = invocation.getArgument(0, PublishBatchRequest.class);
final List<PublishBatchResultEntry> resultEntries = request.getPublishBatchRequestEntries().stream()
.map(entry -> new PublishBatchResultEntry().withId(entry.getId()))
.collect(Collectors.toList());
return new PublishBatchResult().withSuccessful(resultEntries);
});

final TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(70)
.maxBatchSize(10)
.maximumPoolSize(512)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();

amazonSnsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty);
}

@TearDown
public void tearDown() {
amazonSnsTemplate.await().join();
amazonSnsTemplate.shutdown();
}

@Benchmark
@OperationsPerInvocation(1)
public void testSend_1() throws InterruptedException {
amazonSnsTemplate.send(RequestEntry.<Integer>builder().withValue(1).build());
}

@Benchmark
@OperationsPerInvocation(10)
public void testSend_10() throws InterruptedException {
for (int i = 0; i < 10; i++) {
amazonSnsTemplate.send(RequestEntry.<Integer>builder().withValue(i).build());
}
}

@Benchmark
@OperationsPerInvocation(100)
public void testSend_100() throws InterruptedException {
for (int i = 0; i < 100; i++) {
amazonSnsTemplate.send(RequestEntry.<Integer>builder().withValue(i).build());
}
}

@Benchmark
@OperationsPerInvocation(1000)
public void testSend_1000() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
amazonSnsTemplate.send(RequestEntry.<Integer>builder().withValue(i).build());
}
}

@Benchmark
@OperationsPerInvocation(10000)
public void testSend_10000() throws InterruptedException {
for (int i = 0; i < 10000; i++) {
amazonSnsTemplate.send(RequestEntry.<Integer>builder().withValue(i).build());
}
}

}
// @formatter:on
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core.jmh;

public class AmazonSnsTemplateBenchmarkRunner {

public static void main(final String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}

}
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<mockito.version>4.11.0</mockito.version>
<junit.jupiter.version>5.10.2</junit.jupiter.version>
<jackson-databind.version>2.15.2</jackson-databind.version>
<jmh.version>1.37</jmh.version>

<maven-clean-plugin.version>3.2.0</maven-clean-plugin.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
Expand Down Expand Up @@ -186,6 +187,17 @@
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -363,6 +375,11 @@
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</annotationProcessorPath>
<annotationProcessorPath>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</plugin>
Expand Down

0 comments on commit ab077cf

Please sign in to comment.