Skip to content

Commit

Permalink
Merge pull request #11 from telekom/fix/deduplication-exception-fix
Browse files Browse the repository at this point in the history
fix: fixed edge-case where events would be set to failed if hazelcast instance is not active
  • Loading branch information
ledex authored Apr 30, 2024
2 parents 1940469 + 5309939 commit cc0868f
Showing 1 changed file with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import brave.Span;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.metrics.MetricNames;
import de.telekom.eni.pandora.horizon.model.event.DeliveryType;
Expand All @@ -23,7 +24,6 @@

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static de.telekom.horizon.comet.utils.MessageUtils.getMessageSource;

Expand Down Expand Up @@ -98,18 +98,13 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
var rootSpan = tracer.startSpanFromKafkaHeaders("consume subscribed message", consumerRecord.headers());
var rootSpanInScope = tracer.withSpanInScope(rootSpan); // try-with-resources not possible because scope closes after try -> we need context in catch

try {
afterStatusSendFuture = handleEvent(subscriptionEventMessage, rootSpan, clientId);
return afterStatusSendFuture;
} catch (Exception unknownException) {
log.error("Unexpected error while handling message for event with id {}. Event will be set to FAILED!", subscriptionEventMessage.getUuid(), unknownException);
rootSpan.error(unknownException);
return stateService.updateState(Status.FAILED, subscriptionEventMessage, unknownException);
} finally {
log.debug("Finished handling message with id {}", consumerRecord.key());
rootSpanInScope.close();
rootSpan.finish();
}
afterStatusSendFuture = handleEvent(subscriptionEventMessage, rootSpan, clientId);

log.debug("Finished handling message with id {}", consumerRecord.key());
rootSpanInScope.close();
rootSpan.finish();

return afterStatusSendFuture;
}

/**
Expand All @@ -120,20 +115,24 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
* @param rootSpan The root span for the operation.
* @param messageSource The message source identified by HorizonComponentId.
* @return CompletableFuture with SendResult based on event handling outcome.
* @throws ExecutionException If an execution error occurs.
* @throws InterruptedException If the operation is interrupted.
*/
public CompletableFuture<SendResult<String, String>> handleEvent(SubscriptionEventMessage subscriptionEventMessage, Span rootSpan, HorizonComponentId messageSource) throws ExecutionException, InterruptedException {
public CompletableFuture<SendResult<String, String>> handleEvent(SubscriptionEventMessage subscriptionEventMessage, Span rootSpan, HorizonComponentId messageSource) {
if (isCircuitBreakerOpenOrChecking(subscriptionEventMessage)) {
rootSpan.annotate("Circuit Breaker open! Set event on WAITING");
return stateService.updateState(Status.WAITING, subscriptionEventMessage, null);
}

String msgUuidOrNull = deDuplicationService.get(subscriptionEventMessage);
boolean isDuplicate = Objects.nonNull(msgUuidOrNull);
if (isDuplicate) {
// circuit breaker is not open AND event is a duplicate
return handleDuplicateEvent(subscriptionEventMessage, msgUuidOrNull);
try {
String msgUuidOrNull = deDuplicationService.get(subscriptionEventMessage);
boolean isDuplicate = Objects.nonNull(msgUuidOrNull);
if (isDuplicate) {
// circuit breaker is not open AND event is a duplicate
return handleDuplicateEvent(subscriptionEventMessage, msgUuidOrNull);
}
} catch (HazelcastInstanceNotActiveException ex) {
log.error("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex);
rootSpan.annotate("HazelcastInstanceNotActiveException occurred while checking for duplicate event. Event will be delivered anyways.");
rootSpan.error(ex);
}

// circuit breaker is not open AND event is NO duplicate
Expand Down

0 comments on commit cc0868f

Please sign in to comment.