Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
batchContextMap.forEach(action);
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
return batchContextMap.get(batchKey).flushableRequests();
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfReady(String batchKey) {
return batchContextMap.get(batchKey).extractBatchIfReady();
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfSizeExceeded(String batchKey,
RequestT request) {
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
return batchContextMap.get(batchKey).extractBatchIfSizeExceeded(request);
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntriesForScheduledFlush(String batchKey,
int maxBatchItems) {
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);
return batchContextMap.get(batchKey).extractEntriesForScheduledFlush(maxBatchItems);
}

public void cancelScheduledFlush(String batchKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests() {
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfReady() {
synchronized (flushLock) {
return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached())
? extractFlushedEntries(maxBatchItems)
? extractEntries(maxBatchItems)
: Collections.emptyMap();
}
}
Expand All @@ -77,12 +77,12 @@ private boolean isMaxBatchSizeLimitReached() {
return idToBatchContext.size() >= maxBatchItems;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(RequestT request) {
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfSizeExceeded(RequestT request) {
synchronized (flushLock) {
if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) {
int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0);
if (isByteSizeThresholdCrossed(incomingRequestBytes)) {
return extractFlushedEntries(maxBatchItems);
return extractEntries(maxBatchItems);
}
}
return Collections.emptyMap();
Expand All @@ -100,16 +100,16 @@ private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) {
return totalPayloadSize > maxBatchSizeInBytes;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntriesForScheduledFlush(int maxBatchItems) {
synchronized (flushLock) {
if (!idToBatchContext.isEmpty()) {
return extractFlushedEntries(maxBatchItems);
return extractEntries(maxBatchItems);
}
return Collections.emptyMap();
}
}

private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushedEntries(int maxBatchItems) {
private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntries(int maxBatchItems) {
LinkedHashMap<String, BatchingExecutionContext<RequestT, ResponseT>> requestEntries = new LinkedHashMap<>();
String nextEntry;
while (requestEntries.size() < maxBatchItems && hasNextBatchEntry()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public CompletableFuture<ResponseT> batchRequest(RequestT request) {
String batchKey = getBatchKey(request);
// Handle potential byte size overflow only if there are request in map and if feature enabled
if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) {
Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request))
.filter(flushableRequests -> !flushableRequests.isEmpty())
.ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests));
Optional.of(requestsAndResponsesMaps.extractBatchIfSizeExceeded(batchKey, request))
.filter(extractedEntries -> !extractedEntries.isEmpty())
.ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries));
}

// Add request and response to the map, scheduling a flush if necessary
Expand All @@ -86,9 +86,9 @@ public CompletableFuture<ResponseT> batchRequest(RequestT request) {
response);

// Immediately flush if the batch is full
Optional.of(requestsAndResponsesMaps.flushableRequests(batchKey))
.filter(flushableRequests -> !flushableRequests.isEmpty())
.ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests));
Optional.of(requestsAndResponsesMaps.extractBatchIfReady(batchKey))
.filter(extractedEntries -> !extractedEntries.isEmpty())
.ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries));

} catch (Exception e) {
response.completeExceptionally(e);
Expand Down Expand Up @@ -153,21 +153,21 @@ private ScheduledFuture<?> scheduleBufferFlush(String batchKey, long timeOutInMs
}

private void performScheduledFlush(String batchKey) {
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
requestsAndResponsesMaps.flushableScheduledRequests(batchKey, maxBatchItems);
if (!flushableRequests.isEmpty()) {
flushBuffer(batchKey, flushableRequests);
Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractedEntries =
requestsAndResponsesMaps.extractEntriesForScheduledFlush(batchKey, maxBatchItems);
if (!extractedEntries.isEmpty()) {
flushBuffer(batchKey, extractedEntries);
}
}

public void close() {
requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> {
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
requestsAndResponsesMaps.flushableRequests(batchKey);
Map<String, BatchingExecutionContext<RequestT, ResponseT>>
extractedEntries = requestsAndResponsesMaps.extractBatchIfReady(batchKey);

while (!flushableRequests.isEmpty()) {
flushBuffer(batchKey, flushableRequests);
while (!extractedEntries.isEmpty()) {
flushBuffer(batchKey, extractedEntries);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,23 @@ void whenPutRequestThenBufferContainsRequest() {
}

@Test
void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() {
void whenExtractBatchIfReadyThenReturnRequestsUpToMaxBatchItems() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
CompletableFuture<String> response = new CompletableFuture<>();
batchBuffer.put("request1", response);
Map<String, BatchingExecutionContext<String, String>> flushedRequests = batchBuffer.flushableRequests();
assertEquals(1, flushedRequests.size());
assertTrue(flushedRequests.containsKey("0"));
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractBatchIfReady();
assertEquals(1, extractedEntries.size());
assertTrue(extractedEntries.containsKey("0"));
}

@Test
void whenFlushableScheduledRequestsThenReturnAllRequests() {
void whenExtractEntriesForScheduledFlushThenReturnAllRequests() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
CompletableFuture<String> response = new CompletableFuture<>();
batchBuffer.put("request1", response);
Map<String, BatchingExecutionContext<String, String>> flushedRequests = batchBuffer.flushableScheduledRequests(1);
assertEquals(1, flushedRequests.size());
assertTrue(flushedRequests.containsKey("0"));
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractEntriesForScheduledFlush(1);
assertEquals(1, extractedEntries.size());
assertTrue(extractedEntries.containsKey("0"));
}

@Test
Expand Down Expand Up @@ -119,28 +119,28 @@ void whenClearBufferThenBufferIsEmpty() {
}

@Test
void whenExtractFlushedEntriesThenReturnCorrectEntries() {
void whenExtractEntriesThenReturnCorrectEntries() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
for (int i = 0; i < 5; i++) {
batchBuffer.put("request" + i, new CompletableFuture<>());
}
Map<String, BatchingExecutionContext<String, String>> flushedEntries = batchBuffer.flushableRequests();
assertEquals(5, flushedEntries.size());
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractBatchIfReady();
assertEquals(5, extractedEntries.size());
}

@Test
void whenHasNextBatchEntryThenReturnTrue() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
batchBuffer.put("request1", new CompletableFuture<>());
assertTrue(batchBuffer.flushableRequests().containsKey("0"));
assertTrue(batchBuffer.extractBatchIfReady().containsKey("0"));
}


@Test
void whenNextBatchEntryThenReturnNextEntryId() {
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
batchBuffer.put("request1", new CompletableFuture<>());
assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next());
assertEquals("0", batchBuffer.extractBatchIfReady().keySet().iterator().next());
}

@Test
Expand All @@ -151,9 +151,9 @@ void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() {
batchBuffer.put(SendMessageRequest.builder().build(),
new CompletableFuture<>());
}
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("Hi").build());
assertEquals(0, flushedEntries.size());
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("Hi").build());
assertEquals(0, extractedEntries.size());
}


Expand All @@ -166,9 +166,9 @@ void testFlushWhenPayloadExceedsMaxSize() {
String largeMessageBody = createLargeString('a',245_760);
batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(),
new CompletableFuture<>());
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build());
assertEquals(1, flushedEntries.size());
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build());
assertEquals(1, extractedEntries.size());
}

@Test
Expand All @@ -181,11 +181,11 @@ void testFlushWhenCumulativePayloadExceedsMaxSize() {
new CompletableFuture<>());
batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(),
new CompletableFuture<>());
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build());
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build());

//Flushes both the messages since thier sum is greater than 256Kb
assertEquals(2, flushedEntries.size());
assertEquals(2, extractedEntries.size());
}


Expand Down
Loading