Skip to content

Commit

Permalink
Merge pull request #16 from telekom/fix/restore-loglevel
Browse files Browse the repository at this point in the history
Use correct log levels
  • Loading branch information
Th3Shadowbroker authored Aug 15, 2024
2 parents 3a9ab34 + 7ac9aea commit 03505bf
Showing 1 changed file with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SubscribedEventMessageHandler(HorizonTracer tracer,
* @throws JsonProcessingException If there is an error processing the JSON.
*/
public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {
log.warn("Start handling message with id {}", consumerRecord.key());
log.debug("Start handling message with id {}", consumerRecord.key());
var subscriptionEventMessage = objectMapper.readValue(consumerRecord.value(), SubscriptionEventMessage.class);

DeliveryType deliveryType = subscriptionEventMessage.getDeliveryType();
Expand All @@ -99,7 +99,6 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
var rootSpan = tracer.startSpanFromKafkaHeaders("consume subscribed message", consumerRecord.headers());
var rootSpanInScope = tracer.withSpanInScope(rootSpan); // try-with-resources not possible because scope closes after try -> we need context in catch

log.warn("Start to handle Event with id {}", subscriptionEventMessage.getSubscriptionId());
afterStatusSendFuture = handleEvent(subscriptionEventMessage, rootSpan, clientId);

log.debug("Finished handling message with id {}", consumerRecord.key());
Expand All @@ -119,26 +118,25 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
* @return CompletableFuture with SendResult based on event handling outcome.
*/
public CompletableFuture<SendResult<String, String>> handleEvent(SubscriptionEventMessage subscriptionEventMessage, Span rootSpan, HorizonComponentId messageSource) {
log.warn("Check circuitBreaker for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
log.debug("Check circuitBreaker for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
if (isCircuitBreakerOpenOrChecking(subscriptionEventMessage)) {
rootSpan.annotate("Circuit Breaker open! Set event on WAITING");
return stateService.updateState(Status.WAITING, subscriptionEventMessage, null);
}

log.warn("Check deduplication for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
log.debug("Check deduplication for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
try {
String msgUuidOrNull = deDuplicationService.get(subscriptionEventMessage);
log.warn("Deduplication check for subscriptionId {} returned {}", subscriptionEventMessage.getSubscriptionId(), msgUuidOrNull);
log.debug("Deduplication check for subscriptionId {} returned {}", subscriptionEventMessage.getSubscriptionId(), msgUuidOrNull);

boolean isDuplicate = Objects.nonNull(msgUuidOrNull);

if (isDuplicate) {
log.warn("Event with id {} is a duplicate. Check if it is the same event.", subscriptionEventMessage.getUuid());
log.debug("Event with id {} is a duplicate. Check if it is the same event.", subscriptionEventMessage.getUuid());
// circuit breaker is not open AND event is a duplicate
return handleDuplicateEvent(subscriptionEventMessage, msgUuidOrNull);
}
} catch (HazelcastInstanceNotActiveException ex) {
log.warn("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex);
log.error("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex);
rootSpan.annotate("HazelcastInstanceNotActiveException occurred while checking for duplicate event. Event will be delivered anyways.");
rootSpan.error(ex);
Expand Down Expand Up @@ -167,7 +165,7 @@ private boolean isCircuitBreakerOpenOrChecking(SubscriptionEventMessage subscrip
* @return CompletableFuture for the DELIVERING status sending
*/
private CompletableFuture<SendResult<String, String>> deliverEvent(SubscriptionEventMessage subscriptionEventMessage, HorizonComponentId clientId){
log.warn("Set event to DELIVERING and start delivery for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
log.debug("Set event to DELIVERING and start delivery for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
CompletableFuture<SendResult<String, String>> afterStatusSendFuture = stateService.updateState(Status.DELIVERING, subscriptionEventMessage, null);
cometMetrics.recordE2eEventLatencyAndExtendMetadata(subscriptionEventMessage, MetricNames.EndToEndLatencyTardis, clientId);
deliveryService.deliver(subscriptionEventMessage, clientId); // Starts async task in pool
Expand Down

0 comments on commit 03505bf

Please sign in to comment.