Skip to content

Commit

Permalink
Merge pull request #14 from telekom/feature/golaris-support
Browse files Browse the repository at this point in the history
Changes for 4.3.0
  • Loading branch information
Th3Shadowbroker authored Aug 12, 2024
2 parents 29063b8 + 12f3bc1 commit a172b71
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 41 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
awaitilityVersion=4.2.0
httpClientVersion=4.5.14
mockitoInlineVersion=5.2.0
horizonParentVersion=4.1.0
horizonParentVersion=4.2.0
wiremockStandaloneVersion=3.3.1
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Optional<DeliveryTargetInformation> getDeliveryTargetInformation(String s

return subscription.map(subscriptionResource -> new DeliveryTargetInformation
(subscriptionResource.getSpec().getSubscription().getCallback(),
subscriptionResource.getSpec().getSubscription().getDeliveryType(),
subscriptionResource.getSpec().getSubscription().isCircuitBreakerOptOut(),
subscriptionResource.getSpec().getSubscription().getRetryableStatusCodes()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class DeliveryTargetInformation {
*/
private String url;

/**
* The delivery type of the subscription.
*/
private String deliveryType;

/**
* A boolean flag indicating whether the associated callback should active or deactivate the circuit breaker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CompletableFuture<SendResult<String, String>> onMessage(@NonNull Consumer
return null;
}

log.debug("Received (message) ({}) record at partition {} and offset {} in topic {} with record id {}", MessageType.MESSAGE, record.partition(), record.offset(), record.topic(), record.key());
log.warn("Received (message) ({}) record at partition {} and offset {} in topic {} with record id {}", MessageType.MESSAGE, record.partition(), record.offset(), record.topic(), record.key());

try {
return subscribedEventMessageHandler.handleMessage(record);
Expand All @@ -77,6 +77,7 @@ public CompletableFuture<SendResult<String, String>> onMessage(@NonNull Consumer
@Override
public void onMessage(@NotNull List<ConsumerRecord<String, String>> records, @NotNull Acknowledgment acknowledgment) {
List<String> eventUuids = records.stream().map(ConsumerRecord::key).toList();
log.warn("Received batch of records with event ids [{}]", eventUuids);
try {
var afterDeliveringSendFutures = records.stream().map(this::onMessage).filter(Objects::nonNull).map(CompletableFuture::completedFuture).toList();
var sendFuturesArray = afterDeliveringSendFutures.toArray(new CompletableFuture[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
package de.telekom.horizon.comet.service;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import de.telekom.eni.pandora.horizon.cache.service.CacheService;
import de.telekom.eni.pandora.horizon.cache.service.JsonCacheService;
import de.telekom.eni.pandora.horizon.exception.JsonCacheException;
import de.telekom.eni.pandora.horizon.model.meta.CircuitBreakerMessage;
import de.telekom.eni.pandora.horizon.model.meta.CircuitBreakerStatus;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Date;

/**
* The {@code CircuitBreakerCacheService} class provides a service for interacting with a circuit breaker
* cache using Hazelcast. It allows checking the status of a circuit breaker and opening a circuit breaker.
Expand All @@ -21,12 +26,13 @@

@Component
@AllArgsConstructor
@Slf4j
public class CircuitBreakerCacheService {

/**
* The cache service used to store and retrieve circuit breaker status.
*/
private final CacheService circuitBreakerCache;
private final JsonCacheService<CircuitBreakerMessage> circuitBreakerCache;

/**
* Checks if the circuit breaker is open for the given subscription id.
Expand All @@ -36,26 +42,50 @@ public class CircuitBreakerCacheService {
* @throws HazelcastInstanceNotActiveException if the hazelcast instance is not active
*/
public boolean isCircuitBreakerOpenOrChecking(String subscriptionId) throws HazelcastInstanceNotActiveException {
var result = circuitBreakerCache.get(subscriptionId);
if (result.isPresent()) {
CircuitBreakerMessage circuitBreakerMessage = (CircuitBreakerMessage) result.get();
return CircuitBreakerStatus.OPEN.equals(circuitBreakerMessage.getStatus())
|| CircuitBreakerStatus.CHECKING.equals(circuitBreakerMessage.getStatus());
try {
var result = circuitBreakerCache.getByKey(subscriptionId);
if (result.isPresent()) {
CircuitBreakerMessage circuitBreakerMessage = result.get();
return CircuitBreakerStatus.OPEN.equals(circuitBreakerMessage.getStatus());
}
} catch (JsonCacheException e) {
log.error("Could not check status of circuit breaker for subscriptionId {}: {}", subscriptionId, e.getMessage());
e.printStackTrace();
}
return false;
}

/**
* Opens the circuit breaker associated with the given subscriptionId, providing the callback URL
* and environment information.
* and environment information. If the circuit breaker message is already present, the republishing
* count and the last opened is taken from the existing circuit breaker message.
*
* @param subscriptionId The subscriptionId for which to open the circuit breaker.
* @param callbackUrl The callback URL to be associated with the circuit breaker.
* @param environment The environment for which to open the circuit breaker.
* @throws HazelcastInstanceNotActiveException if the hazelcast instance is not active
*/
public void openCircuitBreaker(String subscriptionId, String callbackUrl, String environment) throws HazelcastInstanceNotActiveException {
var circuitBreakerMessage = new CircuitBreakerMessage(subscriptionId, CircuitBreakerStatus.OPEN, callbackUrl, environment);
circuitBreakerCache.update(subscriptionId, circuitBreakerMessage);
public void openCircuitBreaker(String subscriptionId, String eventType, String originMessageId, String environment) throws HazelcastInstanceNotActiveException {
var newCircuitBreakerMessage = new CircuitBreakerMessage(
subscriptionId,
eventType,
Date.from(Instant.now()),
originMessageId,
CircuitBreakerStatus.OPEN,
environment,
Date.from(Instant.now()),
0
);
try {
var result = circuitBreakerCache.getByKey(subscriptionId);

if (result.isPresent()) {
CircuitBreakerMessage existingCircuitBreakerMessage = result.get();
newCircuitBreakerMessage.setLoopCounter(existingCircuitBreakerMessage.getLoopCounter());
newCircuitBreakerMessage.setLastOpened(existingCircuitBreakerMessage.getLastOpened());
}
circuitBreakerCache.set(subscriptionId, newCircuitBreakerMessage);
} catch (JsonCacheException e) {
log.error("Could not open circuit breaker for subscriptionId {}: {}", subscriptionId, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import de.telekom.horizon.comet.cache.CallbackUrlCache;
import de.telekom.horizon.comet.cache.DeliveryTargetInformation;
import de.telekom.horizon.comet.config.CometConfig;
import de.telekom.horizon.comet.exception.CallbackUrlNotFoundException;
import de.telekom.horizon.comet.exception.CouldNotFetchAccessTokenException;
import de.telekom.horizon.comet.model.DeliveryResult;
import de.telekom.horizon.comet.model.DeliveryTaskRecord;
Expand Down Expand Up @@ -186,6 +187,23 @@ private void updateDeliveryState(Status status, SubscriptionEventMessage subscri
try (var ignored = tracer.withSpanInScope(deliveryResult.deliverySpan())) {
var afterSendFuture = stateService.updateState(status, subscriptionEventMessage, deliveryResult.exception());
if (status.equals(Status.DELIVERED) || status.equals(Status.FAILED)) {
var deliveryTypeOfSubscription = callbackUrlCache
.getDeliveryTargetInformation(subscriptionEventMessage.getSubscriptionId())
.map(DeliveryTargetInformation::getDeliveryType);

if (status == Status.FAILED
&& deliveryTypeOfSubscription.isPresent()
&& (deliveryTypeOfSubscription.get().equals("sse") || deliveryTypeOfSubscription.get().equals("server_sent_event"))
&& deliveryResult.exception() instanceof CallbackUrlNotFoundException) {

log.warn("Event with id {} and deliveryType {} and error {} was skipped by deduplication",
subscriptionEventMessage.getUuid(),
deliveryTypeOfSubscription.orElse("unknown"),
deliveryResult.exception().getClass().getSimpleName());

return;
}

afterSendFuture.thenAccept(result -> trackEventForDeduplication(subscriptionEventMessage));
}
} finally {
Expand Down Expand Up @@ -283,8 +301,10 @@ private boolean tryToRedeliver(DeliveryResult deliveryResult) {
* @param subscriptionEventMessage The SubscriptionEventMessage for which to open the circuit breaker.
*/
private void openCircuitBreaker(SubscriptionEventMessage subscriptionEventMessage) {
circuitBreakerCacheService.openCircuitBreaker(subscriptionEventMessage.getSubscriptionId(),
getCallbackUrlOrEmptyStr(subscriptionEventMessage),
circuitBreakerCacheService.openCircuitBreaker(
subscriptionEventMessage.getSubscriptionId(),
subscriptionEventMessage.getEvent().getType(),
subscriptionEventMessage.getUuid(),
subscriptionEventMessage.getEnvironment());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -84,7 +85,7 @@ public SubscribedEventMessageHandler(HorizonTracer tracer,
* @throws JsonProcessingException If there is an error processing the JSON.
*/
public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {
log.debug("Start handling message with id {}", consumerRecord.key());
log.warn("Start handling message with id {}", consumerRecord.key());
var subscriptionEventMessage = objectMapper.readValue(consumerRecord.value(), SubscriptionEventMessage.class);

DeliveryType deliveryType = subscriptionEventMessage.getDeliveryType();
Expand All @@ -98,6 +99,7 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
var rootSpan = tracer.startSpanFromKafkaHeaders("consume subscribed message", consumerRecord.headers());
var rootSpanInScope = tracer.withSpanInScope(rootSpan); // try-with-resources not possible because scope closes after try -> we need context in catch

log.warn("Start to handle Event with id {}", subscriptionEventMessage.getSubscriptionId());
afterStatusSendFuture = handleEvent(subscriptionEventMessage, rootSpan, clientId);

log.debug("Finished handling message with id {}", consumerRecord.key());
Expand All @@ -117,19 +119,26 @@ public CompletableFuture<SendResult<String, String>> handleMessage(ConsumerRecor
* @return CompletableFuture with SendResult based on event handling outcome.
*/
public CompletableFuture<SendResult<String, String>> handleEvent(SubscriptionEventMessage subscriptionEventMessage, Span rootSpan, HorizonComponentId messageSource) {
log.warn("Check circuitBreaker for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
if (isCircuitBreakerOpenOrChecking(subscriptionEventMessage)) {
rootSpan.annotate("Circuit Breaker open! Set event on WAITING");
return stateService.updateState(Status.WAITING, subscriptionEventMessage, null);
}

log.warn("Check deduplication for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
try {
String msgUuidOrNull = deDuplicationService.get(subscriptionEventMessage);
log.warn("Deduplication check for subscriptionId {} returned {}", subscriptionEventMessage.getSubscriptionId(), msgUuidOrNull);

boolean isDuplicate = Objects.nonNull(msgUuidOrNull);

if (isDuplicate) {
log.warn("Event with id {} is a duplicate. Check if it is the same event.", subscriptionEventMessage.getUuid());
// circuit breaker is not open AND event is a duplicate
return handleDuplicateEvent(subscriptionEventMessage, msgUuidOrNull);
}
} catch (HazelcastInstanceNotActiveException ex) {
log.warn("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex);
log.error("HazelcastInstanceNotActiveException occurred while checking for duplicate event with uuid {}. Event will be delivered anyways.", subscriptionEventMessage.getUuid(), ex);
rootSpan.annotate("HazelcastInstanceNotActiveException occurred while checking for duplicate event. Event will be delivered anyways.");
rootSpan.error(ex);
Expand Down Expand Up @@ -158,6 +167,7 @@ private boolean isCircuitBreakerOpenOrChecking(SubscriptionEventMessage subscrip
* @return CompletableFuture for the DELIVERING status sending
*/
private CompletableFuture<SendResult<String, String>> deliverEvent(SubscriptionEventMessage subscriptionEventMessage, HorizonComponentId clientId){
log.warn("Set event to DELIVERING and start delivery for subscriptionId {}", subscriptionEventMessage.getSubscriptionId());
CompletableFuture<SendResult<String, String>> afterStatusSendFuture = stateService.updateState(Status.DELIVERING, subscriptionEventMessage, null);
cometMetrics.recordE2eEventLatencyAndExtendMetadata(subscriptionEventMessage, MetricNames.EndToEndLatencyTardis, clientId);
deliveryService.deliver(subscriptionEventMessage, clientId); // Starts async task in pool
Expand All @@ -169,7 +179,6 @@ private CompletableFuture<SendResult<String, String>> deliverEvent(SubscriptionE
* Handles a duplicate subscription event and updates status accordingly.
*
* @param subscriptionEventMessage The SubscriptionEventMessage to handle.
* @param msgUuidOrNull The UUID of the duplicate message.
* @return CompletableFuture with SendResult based on event handling outcome.
*/
private CompletableFuture<SendResult<String, String>> handleDuplicateEvent(SubscriptionEventMessage subscriptionEventMessage, String msgUuidOrNull) {
Expand All @@ -184,5 +193,4 @@ private CompletableFuture<SendResult<String, String>> handleDuplicateEvent(Subsc

return afterStatusSendFuture;
}

}
Loading

0 comments on commit a172b71

Please sign in to comment.