Skip to content

Commit

Permalink
Merge pull request #15 from telekom/fix/nack
Browse files Browse the repository at this point in the history
Properly execute nack() when delivery attempt completes exceptional
  • Loading branch information
mherwig authored Aug 15, 2024
2 parents a172b71 + 5233090 commit 3a9ab34
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static de.telekom.horizon.comet.utils.MessageUtils.isStatusMessage;

Expand Down Expand Up @@ -55,7 +56,7 @@ public CompletableFuture<SendResult<String, String>> onMessage(@NonNull Consumer
return null;
}

log.warn("Received (message) ({}) record at partition {} and offset {} in topic {} with record id {}", MessageType.MESSAGE, record.partition(), record.offset(), record.topic(), record.key());
log.debug("Received (message) ({}) record at partition {} and offset {} in topic {} with record id {}", MessageType.MESSAGE, record.partition(), record.offset(), record.topic(), record.key());

try {
return subscribedEventMessageHandler.handleMessage(record);
Expand All @@ -77,9 +78,8 @@ public CompletableFuture<SendResult<String, String>> onMessage(@NonNull Consumer
@Override
public void onMessage(@NotNull List<ConsumerRecord<String, String>> records, @NotNull Acknowledgment acknowledgment) {
List<String> eventUuids = records.stream().map(ConsumerRecord::key).toList();
log.warn("Received batch of records with event ids [{}]", eventUuids);
try {
var afterDeliveringSendFutures = records.stream().map(this::onMessage).filter(Objects::nonNull).map(CompletableFuture::completedFuture).toList();
var afterDeliveringSendFutures = records.stream().map(this::onMessage).filter(Objects::nonNull).toList();
var sendFuturesArray = afterDeliveringSendFutures.toArray(new CompletableFuture[0]);
log.debug("Create sendFutureArray {} of list {}", sendFuturesArray, afterDeliveringSendFutures);
CompletableFuture.allOf(sendFuturesArray).join();
Expand Down

0 comments on commit 3a9ab34

Please sign in to comment.