moveQueueJobType,
- Clock clock) {
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
_eventStore = eventStore;
_jobService = jobService;
_moveQueueJobType = moveQueueJobType;
+ this.adminService = adminService;
+ this.producerService = producerService;
+ this.stepFunctionService = stepFunctionService;
+ this.parameterStoreUtil = new ParameterStoreUtil();
+
registerMoveQueueJobHandler(jobHandlerRegistry);
_queueSizeCache = CacheBuilder.newBuilder()
@@ -96,37 +129,148 @@ public MoveQueueResult run(MoveQueueRequest request)
});
}
+ /**
+ * Retrieves the value of the "isExperiment" flag from the cache if available.
+ * If the value is not present in the cache or the cache has expired, it fetches the value
+ * from AWS Parameter Store and stores it in the cache.
+ *
+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed
+ * from the Parameter Store on the next access.
+ *
+ *
+ * @return {@code true} if the experiment is still running, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store.
+ */
+ private boolean getIsExperimentValue() {
+ try {
+ // Attempt to retrieve from cache
+ return experimentCache.get(IS_EXPERIMENT, () -> {
+
+ Boolean checkExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment"));
+ _log.info("IS_EXPERIMENT is refreshed {}", checkExperiment);
+ // If absent or expired, fetch from Parameter Store and cache the result
+ return checkExperiment;
+ });
+ } catch (Exception e) {
+ // Handle any errors that might occur while accessing the cache or Parameter Store
+ throw new RuntimeException("Error fetching experiment flag", e);
+ }
+ }
+
@Override
public void send(String queue, Object message) {
- sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ //boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment"));
+ boolean isExperiment = getIsExperimentValue();
+ if (!isExperiment) {
+ // Experiment is over now, send everything to Kafka
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ } else {
+ List allowedQueues = fetchAllowedQueues();
+ // Experiment is still running, check if the queue is allowed
+ if (allowedQueues.contains(queue)) {
+ // Send to Kafka, only if it's an allowed queue
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ } else {
+ // Send to Cassandra (rollback plan)
+ sendAll(queue, Collections.singleton(message), false);
+ }
+ }
}
@Override
public void sendAll(String queue, Collection> messages) {
- sendAll(Collections.singletonMap(queue, messages));
+ List allowedQueues = fetchAllowedQueues();
+ boolean isExperiment = getIsExperimentValue();
+ if (!isExperiment) {
+ // experiment is over now, send everything to kafka
+ sendAll(Collections.singletonMap(queue, messages));
+ } else {
+ // Experiment is still running, check if the queue is allowed
+ if(allowedQueues.contains(queue)){
+ //send kafka , only if its allowed queue
+ sendAll(Collections.singletonMap(queue, messages));
+ }
+ else {
+ //send to cassandra, (rollback plan)
+ sendAll(queue, messages, false);
+ }
+ }
+ }
+
+
+ private void validateMessage(Object message) {
+ // Check if the message is valid using JsonValidator
+ ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
+
+ // Check if the message size exceeds the allowed limit
+ checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES,
+ "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+
+ }
+
+ private void validateQueue(String queue, Collection> messages) {
+ requireNonNull(queue, "Queue name cannot be null");
+ requireNonNull(messages, "Messages collection cannot be null");
+
+ // Check if the queue name is legal
+ checkLegalQueueName(queue);
}
@Override
public void sendAll(Map> messagesByQueue) {
requireNonNull(messagesByQueue, "messagesByQueue");
- ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
for (Map.Entry> entry : messagesByQueue.entrySet()) {
String queue = entry.getKey();
Collection> messages = entry.getValue();
- checkLegalQueueName(queue);
- requireNonNull(messages, "messages");
+ validateQueue(queue, messages);
- List events = Lists.newArrayListWithCapacity(messages.size());
- for (Object message : messages) {
- ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
- checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+ List events = Lists.newArrayListWithCapacity(messages.size());
- events.add(messageByteBuffer);
+ // Validate each message
+ for (Object message : messages) {
+ validateMessage(message);
+ events.add(message.toString());
}
builder.putAll(queue, events);
}
+
+ Multimap eventsByChannel = builder.build();
+
+ String queueType = determineQueueType();
+ for (Map.Entry> topicEntry : eventsByChannel.asMap().entrySet()) {
+ String queueName= topicEntry.getKey();
+ String topic = "dsq-" + (("dedupq".equals(queueType)) ? "dedup-" + queueName : queueName);
+ // Check if the topic exists, if not create it and execute Step Function
+ if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) {
+ Map parameters = fetchStepFunctionParameters();
+ // Execute Step Function after topic creation
+ startStepFunctionExecution(parameters, queueType,queueName, topic);
+ }
+ producerService.sendMessages(topic, topicEntry.getValue(), queueType);
+ _log.info("Messages sent to topic: {}", topic);
+ }
+ }
+
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean fromKafka) {
+ //incoming message from kafka consume, save to cassandra
+ if(!fromKafka){
+ validateQueue(queue, messages);
+ }
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+ List events = Lists.newArrayListWithCapacity(messages.size());
+
+
+ for (Object message : messages) {
+ ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
+ checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES,
+ "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+ events.add(messageByteBuffer);
+ }
+ builder.putAll(queue, events);
Multimap eventsByChannel = builder.build();
_eventStore.addAll(eventsByChannel);
@@ -137,6 +281,11 @@ public long getMessageCount(String queue) {
return getMessageCountUpTo(queue, Long.MAX_VALUE);
}
+ @Override
+ public long getUncachedSize(String queue){
+ return internalMessageCountUpTo(queue, Long.MAX_VALUE);
+ }
+
@Override
public long getMessageCountUpTo(String queue, long limit) {
// We get the size from cache as a tuple of size, and the limit used to estimate that size
@@ -195,7 +344,6 @@ public void renew(String queue, Collection messageIds, Duration claimTtl
public void acknowledge(String queue, Collection messageIds) {
checkLegalQueueName(queue);
requireNonNull(messageIds, "messageIds");
-
_eventStore.delete(queue, messageIds, true);
}
@@ -274,4 +422,103 @@ private void checkLegalQueueName(String queue) {
"Allowed punctuation characters are -.:@_ and the queue name may not start with a single underscore character. " +
"An example of a valid table name would be 'polloi:provision'.");
}
-}
+ /**
+ * Fetches the necessary Step Function parameters from AWS Parameter Store.
+ */
+ private Map fetchStepFunctionParameters() {
+ List parameterNames = Arrays.asList(
+ "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn",
+ "/"+ UNIVERSE+ "/emodb/stepfn/queueThreshold",
+ "/"+ UNIVERSE+ "/emodb/stepfn/batchSize",
+ "/"+ UNIVERSE+ "/emodb/stepfn/interval"
+ );
+
+ try {
+ return parameterStoreUtil.getParameters(parameterNames);
+ } catch (Exception e) {
+ _log.error("Failed to fetch Step Function parameters from Parameter Store", e);
+ throw new RuntimeException("Error fetching Step Function parameters", e);
+ }
+ }
+
+ /**
+ * Executes the Step Function for a given topic after it has been created.
+ */
+
+ private void startStepFunctionExecution(Map parameters, String queueType, String queueName, String topic) {
+ try {
+ String stateMachineArn = parameters.get( "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn");
+ int queueThreshold = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/queueThreshold"));
+ int batchSize = Integer.parseInt(parameters.get ("/"+ UNIVERSE+ "/emodb/stepfn/batchSize"));
+ int interval = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/interval"));
+
+ String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval);
+
+ // Current time in milliseconds
+ String timestamp = String.valueOf(System.currentTimeMillis());
+ queueName = stepFunctionService.sanitizeExecutionName(queueName);
+ // Check if queueType is "dedupq" and prepend "D" to execution name if true
+ String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;
+ // Start the Step Function execution
+ stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
+
+ _log.info("Step Function executed for topic: {} with executionName: {}", topic, executionName);
+ } catch (Exception e) {
+ _log.error("Error executing Step Function for topic: {}", topic, e);
+ throw new RuntimeException("Error executing Step Function for topic: " + topic, e);
+ }
+ }
+
+ /**
+ * Determines the queue type based on the event store.
+ */
+ private String determineQueueType() {
+ if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) {
+ return "dedupq";
+ }
+ return "queue";
+ }
+
+ private List fetchAllowedQueues() {
+ try {
+ return new ArrayList<>(allowedQueuesCache.get(ALLOWED_QUEUES, this::fetchAllowedQueuesFromParamStore));
+ } catch (Exception e) {
+ // Handle the case when the parameter is not found or fetching fails
+ _log.error("Error fetching allowedQueues in fetchAllowedQueues: " + e.getMessage());
+ return Collections.singletonList(""); // Default to an empty list if the parameter is missing
+ }
+
+ }
+
+ private Set fetchAllowedQueuesFromParamStore() {
+ try {
+ // Fetch the 'allowedQueues' parameter using ParameterStoreUtil
+ String allowedQueuesStr = parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/allowedQueues");
+ _log.info("ALLOWED_QUEUES is refreshed");
+ return new HashSet<>(Arrays.asList(allowedQueuesStr.split(",")));
+ } catch (Exception e) {
+ // Handle the case when the parameter is not found or fetching fails
+ _log.error("Error fetching allowedQueues: " + e.getMessage());
+ return new HashSet<>(Collections.singletonList("")); // Default to an empty list if the parameter is missing
+ }
+ }
+
+ private String createInputPayload(int queueThreshold, int batchSize, String queueType, String queueName, String topicName, int interval) {
+ Map payloadData = new HashMap<>();
+ payloadData.put("queueThreshold", queueThreshold);
+ payloadData.put("batchSize", batchSize);
+ payloadData.put("queueType", queueType);
+ payloadData.put("queueName", queueName);
+ payloadData.put("topicName", topicName);
+ payloadData.put("interval", interval);
+ Map wrappedData = new HashMap<>();
+ wrappedData.put("executionInput", payloadData); // Wrap the data
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.writeValueAsString(wrappedData); // Convert wrapped data to JSON
+ } catch (JsonProcessingException e) {
+ _log.error("Error while converting map to JSON", e);
+ return "{}"; // Return empty JSON object on error
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
index 85ef29c4fc..9ec8d36606 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
@@ -4,6 +4,9 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.queue.api.DedupQueueService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import java.time.Clock;
@@ -11,7 +14,7 @@
public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService {
@Inject
public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock) {
- super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock);
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService );
}
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
index 71f9a61ed3..524ca50033 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
@@ -4,6 +4,9 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.queue.api.QueueService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import java.time.Clock;
@@ -11,7 +14,7 @@
public class DefaultQueueService extends AbstractQueueService implements QueueService {
@Inject
public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock) {
- super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock);
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService);
}
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java
new file mode 100644
index 0000000000..13f396fe9a
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/ExecutionInputWrapper.java
@@ -0,0 +1,14 @@
+package com.bazaarvoice.emodb.queue.core.Entities;
+
+public class ExecutionInputWrapper {
+ private QueueExecutionAttributes executionInput;
+
+ // Getter and Setter
+ public QueueExecutionAttributes getExecutionInput() {
+ return executionInput;
+ }
+
+ public void setExecutionInput(QueueExecutionAttributes executionInput) {
+ this.executionInput = executionInput;
+ }
+}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java
new file mode 100644
index 0000000000..a35b60cbaf
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/Entities/QueueExecutionAttributes.java
@@ -0,0 +1,109 @@
+package com.bazaarvoice.emodb.queue.core.Entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class QueueExecutionAttributes {
+
+ private String queueName;
+ private String queueType;
+ private String topicName;
+ private Integer queueThreshold;
+ private Integer batchSize;
+ private Integer interval;
+ private String status;
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public void setQueueType(String queueType) {
+ this.queueType = queueType;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public void setQueueThreshold(int queueThreshold) {
+ this.queueThreshold = queueThreshold;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+ public Integer getQueueThreshold() {
+ return queueThreshold;
+ }
+
+ public Integer getBatchSize() {
+ return batchSize;
+ }
+
+ public Integer getInterval() {
+ return interval;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getQueueType() {
+ return queueType;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public String getJsonPayload(QueueExecutionAttributes attributes) throws JsonProcessingException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(attributes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof QueueExecutionAttributes)) return false;
+ QueueExecutionAttributes that = (QueueExecutionAttributes) o;
+ return Objects.equals(getQueueName(), that.getQueueName()) && Objects.equals(getQueueType(), that.getQueueType()) && Objects.equals(getTopicName(), that.getTopicName()) && Objects.equals(getQueueThreshold(), that.getQueueThreshold()) && Objects.equals(getBatchSize(), that.getBatchSize()) && Objects.equals(getInterval(), that.getInterval());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getQueueName(), getQueueType(), getTopicName(), getQueueThreshold(), getBatchSize(), getInterval());
+ }
+
+ @Override
+ public String toString() {
+ return "QueueExecutionAttributes{" +
+ "queueName='" + queueName + '\'' +
+ ", queueType='" + queueType + '\'' +
+ ", topicName='" + topicName + '\'' +
+ ", queueThreshold=" + queueThreshold +
+ ", batchSize=" + batchSize +
+ ", interval=" + interval +
+ ", status='" + status + '\'' +
+ '}';
+ }
+}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
index 47e24ccf38..b349b19298 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
@@ -91,6 +91,11 @@ public void purge(String apiKey, String queue) {
_dedupQueueService.purge(queue);
}
+ @Override
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+ _dedupQueueService.sendAll(queue, messages, isFlush);
+ }
+
@Override
public void sendAll(String apiKey, Map> messagesByQueue) {
_dedupQueueService.sendAll(messagesByQueue);
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
index 5ceea10a8e..cdafc8935e 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
@@ -95,4 +95,9 @@ public void purge(String apiKey, String queue) {
public void sendAll(String apiKey, Map> messagesByQueue) {
_queueService.sendAll(messagesByQueue);
}
+
+ @Override
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+
+ }
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java
new file mode 100644
index 0000000000..9621617ba3
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java
@@ -0,0 +1,120 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import java.util.Collections;
+
+public class KafkaAdminService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class);
+ private final AdminClient adminClient;
+ // Cache for the list of all topics with a TTL of 10 minutes
+ private final Cache> topicListCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
+
+ private static final String TOPIC_LIST_KEY = "allTopics";
+
+
+ public KafkaAdminService() {
+ this.adminClient = AdminClient.create(KafkaConfig.getAdminProps());
+ }
+
+ /**
+ * Creates a new Kafka topic with the specified configurations.
+ *
+ * @param topic The name of the topic.
+ * @param numPartitions Number of partitions.
+ * @param replicationFactor Replication factor.
+ */
+ public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) {
+ Boolean isExisting =isTopicExists(topic);
+ if (! isExisting) {
+ //create the topic now
+ NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
+ try {
+ adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ addToCache(topic);
+ _log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor);
+ } catch (Exception e) {
+ _log.error("Error creating topic {}: ", topic, e);
+ throw new RuntimeException(e);
+ }
+ }
+ return isExisting;
+ }
+ public void addToCache(String topic){
+ Set topics = topicListCache.getIfPresent(TOPIC_LIST_KEY);
+ if (topics == null) {
+ topics = new HashSet<>();
+ } else {
+ // Create a new mutable Set if the existing one is unmodifiable
+ topics = new HashSet<>(topics);
+ }
+ topics.add(topic);
+ topicListCache.put(TOPIC_LIST_KEY, topics);
+ _log.info("Added newly created topic to cache: {}", topic);
+ }
+
+
+ /**
+ * Checks if a Kafka topic exists by using a cache to store the list of all topics.
+ * If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list.
+ *
+ * The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed
+ * from Kafka on the next access.
+ *
+ *
+ * @param topic the name of the Kafka topic to check
+ * @return {@code true} if the topic exists, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists.
+ */
+ public boolean isTopicExists(String topic) {
+ try {
+ // Retrieve the list of topics from the cache
+ Set topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka);
+
+ // Check if the given topic is in the cached list
+ return topics.contains(topic);
+ } catch (ExecutionException e) {
+ _log.error("Failed to check if topic exists: {}", topic, e);
+ throw new RuntimeException("Error checking if topic exists", e);
+ }
+ }
+
+ /**
+ * Fetches the list of all topic names from Kafka AdminClient.
+ * This method is called only when the cache is expired or empty.
+ *
+ * @return a Set containing all topic names.
+ * @throws ExecutionException if there is an error fetching the topic list from Kafka.
+ */
+ private Set fetchTopicListFromKafka() throws ExecutionException {
+ try {
+ _log.info("Fetching topic list from Kafka");
+ return adminClient.listTopics().names().get();
+ } catch (Exception e) {
+ _log.error("Error fetching topic list from Kafka", e);
+ throw new ExecutionException(e);
+ }
+ }
+
+ /**
+ * Closes the AdminClient to release resources.
+ */
+ public void close() {
+ if (adminClient != null) {
+ adminClient.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java
new file mode 100644
index 0000000000..c4ab54ad99
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java
@@ -0,0 +1,157 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.Parameter;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class KafkaConfig {
+ private static String bootstrapServersConfig;
+ private static String batchSizeConfig;
+ private static String retriesConfig;
+ private static String lingerMsConfig;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+ // Static SSM Client and configuration using AWS SDK v1
+ private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder
+ .standard()
+ .build();
+
+
+ static {
+ try {
+ final String UNIVERSE = getUniverseFromEnv();
+ // Load configurations from SSM during static initialization
+ Map parameterValues = getParameterValues(
+ Arrays.asList(
+ "/" + UNIVERSE + "/emodb/kafka/batchSize",
+ "/" + UNIVERSE + "/emodb/kafka/retries",
+ "/" + UNIVERSE + "/emodb/kafka/lingerMs",
+ "/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
+ )
+ );
+
+ // Set configurations with fallback to defaults if not present
+ // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
+ batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
+
+ // Sets the number of retry attempts for failed Kafka message sends.
+ retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
+
+ // Sets the number of milliseconds a producer is willing to wait before sending a batch out
+ lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
+
+ // Configures the Kafka broker addresses for producer connections.
+ bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", "localhost:9092");
+
+ logger.info("Kafka configurations loaded successfully from SSM.");
+ } catch (AmazonServiceException e) {
+ logger.error("Failed to load configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ catch (Exception e) {
+ logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ }
+
+ public static String getUniverseFromEnv() {
+ String filePath = "/etc/environment";
+ logger.info("Reading environment file: " + filePath);
+ Properties environmentProps = new Properties();
+
+ try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Skip empty lines or comments
+ if (line.trim().isEmpty() || line.trim().startsWith("#")) {
+ continue;
+ }
+ // Split the line into key-value pair
+ String[] parts = line.split("=", 2);
+ logger.info("parts: " + Arrays.toString(parts));
+ if (parts.length == 2) {
+ String key = parts[0].trim();
+ String value = parts[1].trim();
+ // Remove any surrounding quotes from value
+ value = value.replace("\"", "");
+ environmentProps.put(key, value);
+ }
+ }
+ // Access the environment variables
+ return environmentProps.getProperty("UNIVERSE");
+ } catch (IOException e) {
+ logger.error("Error reading environment file: " + e.getMessage());
+ throw new RuntimeException("Error reading environment file: " + e.getMessage());
+ }
+ }
+ // Fetch parameters from AWS SSM using AWS SDK v1
+ private static Map getParameterValues(List parameterNames) {
+ try {
+ GetParametersRequest request = new GetParametersRequest()
+ .withNames(parameterNames)
+ .withWithDecryption(true);
+
+ GetParametersResult response = ssmClient.getParameters(request);
+
+ return response.getParameters().stream()
+ .collect(Collectors.toMap(Parameter::getName, Parameter::getValue));
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from SSM.", e);
+ throw e; // Rethrow or handle the exception if necessary
+ }
+ }
+
+ // Kafka Producer properties
+ public static Properties getProducerProps() {
+ Properties producerProps = new Properties();
+
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig));
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig));
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig));
+ producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting
+ logger.info("Kafka Producer properties initialized.");
+ return producerProps;
+ }
+
+ // Kafka Admin properties
+ public static Properties getAdminProps() {
+ Properties adminProps = new Properties();
+
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ logger.info("Kafka Admin properties initialized.");
+ return adminProps;
+ }
+
+ // Ensure the SSM client is closed when the application shuts down
+ public static void shutdown() {
+ if (ssmClient != null) {
+ try {
+ ssmClient.shutdown();
+ logger.info("SSM client closed successfully.");
+ } catch (Exception e) {
+ logger.error("Error while closing SSM client.", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java
new file mode 100644
index 0000000000..efbfc5f27c
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java
@@ -0,0 +1,66 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public class KafkaProducerService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class);
+ private final KafkaProducer producer; // Changed to String
+
+ public KafkaProducerService() {
+ this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps());
+ _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps());
+ }
+
+ /**
+ * Sends each message from the collection to the specified Kafka topic separately.
+ *
+ * @param topic The Kafka topic.
+ * @param events The collection of messages to be sent.
+ */
+ public void sendMessages(String topic, Collection events, String queueType) {
+ LocalDateTime startTime = LocalDateTime.now();
+ _log.info("Sending {} messages to topic '{}'", events.size(), topic);
+ List> futures = new ArrayList<>();
+ // Use async sendMessage and collect futures
+ for (String event : events) {
+ futures.add(producer.send(new ProducerRecord<>(topic, event)));
+ }
+
+ // Wait for all futures to complete
+ for (Future future : futures) {
+ try {
+ future.get(); // Only blocks if a future is not yet complete
+ } catch (Exception e) {
+ _log.error("Error while sending message to Kafka: {}", e.getMessage());
+ throw new RuntimeException("Error sending messages to Kafka", e);
+ }
+ }
+ _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis());
+ }
+
+
+ /**
+ * Closes the producer to release resources.
+ */
+ public void close() {
+ _log.info("Closing Kafka producer.");
+ try {
+ producer.flush();
+ producer.close();
+ } catch (Exception e) {
+ _log.error("Error while closing Kafka producer: ", e);
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java
new file mode 100644
index 0000000000..f1bfdb0d01
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java
@@ -0,0 +1,127 @@
+package com.bazaarvoice.emodb.queue.core.ssm;
+
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult;
+import com.amazonaws.services.simplesystemsmanagement.model.PutParameterRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.PutParameterResult;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for interacting with AWS Parameter Store using AWS SDK v1.
+ */
+public class ParameterStoreUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class);
+ private final AWSSimpleSystemsManagement ssmClient;
+
+ /**
+ * Constructor to initialize the SSM client
+ */
+ public ParameterStoreUtil() {
+ // Create SSM client with default credentials and region
+ ssmClient = AWSSimpleSystemsManagementClientBuilder.standard()
+ .build();
+ }
+
+ /**
+ * Fetches a parameter from AWS Parameter Store.
+ *
+ * @param parameterName The name of the parameter to fetch
+ * @return The value of the parameter
+ * @throws IllegalArgumentException If the parameterName is null or empty
+ */
+ public String getParameter(String parameterName) {
+ if (parameterName == null || parameterName.isEmpty()) {
+ logger.error("Parameter name cannot be null or empty");
+ throw new IllegalArgumentException("Parameter name cannot be null or empty");
+ }
+
+ try {
+
+ GetParameterRequest request = new GetParameterRequest().withName(parameterName);
+ GetParameterResult result = ssmClient.getParameter(request);
+ return result.getParameter().getValue();
+
+ } catch (ParameterNotFoundException e) {
+ logger.error("Parameter not found: {}", parameterName, e);
+ throw new RuntimeException("Parameter not found: " + parameterName, e);
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameter: {}", parameterName, e);
+ throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e);
+ }
+ }
+
+ /**
+ * Fetches multiple parameters from AWS Parameter Store in a batch.
+ *
+ * @param parameterNames The list of parameter names to fetch
+ * @return A map of parameter names to their values
+ * @throws IllegalArgumentException If the parameterNames list is null or empty
+ */
+ public Map getParameters(List parameterNames) {
+ if (parameterNames == null || parameterNames.isEmpty()) {
+ logger.error("Parameter names list cannot be null or empty");
+ throw new IllegalArgumentException("Parameter names list cannot be null or empty");
+ }
+
+ try {
+
+ GetParametersRequest request = new GetParametersRequest().withNames(parameterNames);
+ GetParametersResult result = ssmClient.getParameters(request);
+
+ // Map the result to a Map of parameter names and values
+ Map parameters = new HashMap<>();
+ result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue()));
+
+ // Log any parameters that were not found
+ if (!result.getInvalidParameters().isEmpty()) {
+ logger.warn("The following parameters were not found: {}", result.getInvalidParameters());
+ }
+
+ return parameters;
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameters: {}", parameterNames, e);
+ throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e);
+ }
+ }
+
+ public Long updateParameter(String key, String value) {
+ try {
+ if (key == null || key.trim().isEmpty()) {
+ logger.error("parameter name cannot be null or blank");
+ throw new IllegalArgumentException("parameter name cannot be null or blank");
+ }
+
+ PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true);
+
+ PutParameterResult response = ssmClient.putParameter(request);
+ logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion());
+ return response.getVersion();
+ } catch (Exception e) {
+ logger.error("Failed to update parameter: " + key + " with value: " + value, e);
+ throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e);
+ }
+ }
+
+}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java
new file mode 100644
index 0000000000..a41d95a5a7
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java
@@ -0,0 +1,432 @@
+package com.bazaarvoice.emodb.queue.core.stepfn;
+
+
+import com.amazonaws.services.stepfunctions.AWSStepFunctions;
+import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder;
+import com.amazonaws.services.stepfunctions.model.*;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig;
+import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
+import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
+import com.bazaarvoice.emodb.queue.core.Entities.QueueExecutionAttributes;
+import com.bazaarvoice.emodb.queue.core.Entities.ExecutionInputWrapper;
+import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.util.List;
+import java.util.MissingResourceException;
+
+/**
+ * Service to interact with AWS Step Functions using AWS SDK v1.
+ */
+public class StepFunctionService {
+
+ private static final Logger logger = LoggerFactory.getLogger(StepFunctionService.class);
+
+ private final AWSStepFunctions stepFunctionsClient;
+ private static String universe;
+ private final ParameterStoreUtil _parameterStoreUtil;
+
+ /**
+ * Constructor to initialize Step Function Client with AWS region and credentials.
+ */
+ public StepFunctionService() {
+ universe=KafkaConfig.getUniverseFromEnv();
+ this._parameterStoreUtil = new ParameterStoreUtil();
+ this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard().build();
+ }
+
+ /**
+ * Sanitizes the execution name by replacing invalid characters with underscores
+ * and truncating if needed.
+ */
+ public String sanitizeExecutionName(String executionName) {
+ if (executionName == null || executionName.isEmpty()) {
+ throw new IllegalArgumentException("Execution name cannot be null or empty");
+ }
+ executionName = executionName.trim();
+ // Replace invalid characters with underscores
+ String sanitized = executionName.replaceAll("[^a-zA-Z0-9\\-_]", "_");
+
+ // Check if the sanitized name is empty or consists only of underscores
+ if (sanitized.isEmpty() || sanitized.replaceAll("_", "").isEmpty()) {
+ throw new IllegalArgumentException("Execution name cannot contain only invalid characters");
+ }
+
+ // Truncate from the beginning if length exceeds 66 characters
+ if (sanitized.length() > 66) {
+ sanitized = sanitized.substring(sanitized.length() - 66);
+ }
+
+ // Log the updated execution name if it has changed
+ if (!sanitized.equals(executionName)) {
+ logger.info("Updated execution name: {}", sanitized);
+ }
+ return sanitized;
+ }
+
+
+ /**
+ * Starts the execution of a Step Function with provided execution attributes.
+ *
+ * @param queueExecutionAttributes execution input attributes
+ * @throws RuntimeException If method fails to re-start or start with provided execution input attributes
+ *
+ * queueType and queueName are mandatory inputs
+ * CASE-1 (status = "DISABLED" provided) : active execution if any, stops.
+ * CASE-2 (all 4 inputs(qt, bs, i, tn) provided): a new execution is started with these attributes, stopping any active one.
+ * CASE-3 (any/all of 4 inputs(qt, bs, i, tn) missing): If any active execution exist, a new execution is started with provided inputs updated, stopping the active one
+ * CASE-4 (any/all of 4 inputs(qt, bs, i, tn) missing): If any active execution doesn't exist, Exception occurs, IllegalArgumentException
+ */
+ public void startSFNWithAttributes(QueueExecutionAttributes queueExecutionAttributes) {
+ QueueExecutionAttributes existingAttributes;
+
+ //1. fetch attributes for any existing execution
+ try {
+ existingAttributes = getExistingSFNAttributes(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName());
+ } catch (Exception e) {
+ logger.error("Error getting existing step-function attributes for " + queueExecutionAttributes + " | " + e.getMessage());
+ throw new RuntimeException("Error getting existing step-function attributes for " + queueExecutionAttributes + " | " + e.getMessage());
+ }
+
+ //2. if no running execution exists, start a new one with provided/new attributes
+ if (existingAttributes == null) {
+ try {
+ startExecution(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName(), queueExecutionAttributes);
+ return;
+ } catch(Exception e){
+ logger.error("Error starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage());
+ throw new RuntimeException("Error starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage());
+ }
+ }
+
+ //3. check sanity of starting a new execution before stopping the older execution.
+ syncFreshAttributesFromExistingExecution(queueExecutionAttributes, existingAttributes);
+
+ //4. stop active execution (if any)
+ try {
+ stopActiveExecutions(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName());
+ logger.info("Successfully stopped active execution(if any) for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType());
+ } catch(Exception e){
+ logger.error("Error stopping step-function for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType() + " | " + e.getMessage());
+ throw new RuntimeException("Error stopping step-function for queueName: " + queueExecutionAttributes.getQueueName() + ", queueType: " + queueExecutionAttributes.getQueueType() + " | " + e.getMessage());
+ }
+
+ //4. if new attributes can't start a fresh execution, re-start the already running sfn, else start a fresh execution with new attributes
+ try {
+ startExecution(queueExecutionAttributes.getQueueType(), queueExecutionAttributes.getQueueName(), queueExecutionAttributes);
+ } catch (Exception e){
+ logger.error("Error re-starting step-function with attributes " + queueExecutionAttributes + " | " + e.getMessage());
+ throw new RuntimeException("Error re-starting step-function with attributes " + queueExecutionAttributes + "|" + e.getMessage());
+ }
+ }
+
+
+ /**
+ * Starts an execution of step-function associated with (queueType, queueName), with provided attributes.
+ *
+ * @param queueType queueType
+ * @param queueName queueName
+ * @param executionAttributes execution inputs
+ *
+ */
+ public void startExecution(String queueType, String queueName, QueueExecutionAttributes executionAttributes) throws JsonProcessingException {
+
+ if(executionAttributes == null) {
+ throw new IllegalArgumentException("execution input object can't be null");
+ }
+
+ if(executionAttributes.getStatus() == null || executionAttributes.getStatus().isEmpty()) {
+ executionAttributes.setStatus("ENABLED");
+ }
+
+ if(executionAttributes.getStatus().equals("DISABLED")) {
+ logger.info("step-function's execution can't be triggered because status=DISABLED provided" );
+ return;
+ }
+
+ String payload = constructPayload(queueName, queueType, executionAttributes);
+
+ try {
+ String stateMachineArn = getStateMachineARN();
+ String executionName = (queueType.equalsIgnoreCase("dedup") ? "D_" : "") + queueName + "_" + System.currentTimeMillis();
+ StartExecutionRequest startExecutionRequest = new StartExecutionRequest().withStateMachineArn(stateMachineArn)
+ .withInput(payload)
+ .withName(executionName);
+
+ StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest);
+
+ logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn);
+ logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn());
+
+ } catch (StateMachineDoesNotExistException e) {
+ logger.error("State Machine does not exist for queue_type: " + queueType + ", queue_name: " + queueName, e);
+ } catch (InvalidArnException e) {
+ logger.error("Invalid ARN provided for queue_type: " + queueType + ", queue_name: " + queueName, e);
+ } catch (InvalidExecutionInputException e) {
+ logger.error("Invalid execution input provided: {}", payload, e);
+ } catch (AWSStepFunctionsException e) {
+ logger.error("Error executing Step Function: {}", e.getMessage(), e);
+ throw e; // Re-throw after logging
+ } catch (Exception e) {
+ logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e);
+ throw e; // Re-throw unexpected exceptions
+ }
+ }
+
+
+ private String constructPayload(String queueName, String queueType, QueueExecutionAttributes executionAttributes) throws JsonProcessingException {
+
+ validateExecutionInputs(queueName, queueType, executionAttributes);
+ executionAttributes.setQueueType(queueType);
+ executionAttributes.setQueueName(queueName);
+
+ ExecutionInputWrapper executionInputWrapper = new ExecutionInputWrapper();
+ executionInputWrapper.setExecutionInput(executionAttributes);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.writeValueAsString(executionInputWrapper);
+ }
+
+
+ /**
+ * Starts the execution of a Step Function with the given state machine ARN and input payload.
+ *
+ * @param stateMachineArn ARN of the state machine
+ * @param inputPayload Input for the state machine execution
+ * @throws IllegalArgumentException If the stateMachineArn is invalid
+ */
+ public void startExecution(String stateMachineArn, String inputPayload, String executionName) {
+ if (stateMachineArn == null || stateMachineArn.isEmpty()) {
+ logger.error("State Machine ARN cannot be null or empty");
+ throw new IllegalArgumentException("State Machine ARN cannot be null or empty");
+ }
+
+ if (inputPayload == null) {
+ logger.warn("Input payload is null; using empty JSON object");
+ inputPayload = "{}"; // Default to empty payload if null
+ }
+
+ try {
+ StartExecutionRequest startExecutionRequest = new StartExecutionRequest()
+ .withStateMachineArn(stateMachineArn)
+ .withInput(inputPayload)
+ .withName(executionName);
+
+ StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest);
+
+ logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn);
+ logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn());
+
+ } catch (Exception e) {
+ logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e);
+ throw e;
+ }
+ }
+
+
+ /**
+ * Gets execution inputs of an active/running step-function associated with (queueType, queueName).
+ *
+ * @param queueType queueType
+ * @param queueName queueName
+ *
+ * @return valid QueueExecutionAttributes : if any active execution exists, else NULL.
+ *
+ * @throws JsonProcessingException If execution input attributes fails in getting converted to a valid execution payload json
+ */
+ public QueueExecutionAttributes getExistingSFNAttributes(String queueType, String queueName) throws JsonProcessingException {
+ try {
+ List executionItemList = getAllActiveExecutionArns();
+
+ for(ExecutionListItem executionItem : executionItemList) {
+ String executionARN = executionItem.getExecutionArn();
+
+ DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest().withExecutionArn(executionARN);
+ DescribeExecutionResult describeExecutionResult = stepFunctionsClient.describeExecution(describeExecutionRequest);
+
+ String existingInputPayload = describeExecutionResult.getInput();
+ QueueExecutionAttributes queueExecutionAttributes = new ObjectMapper().readValue(existingInputPayload, ExecutionInputWrapper.class).getExecutionInput();
+
+ if(queueExecutionAttributes.getQueueType() != null && queueExecutionAttributes.getQueueType().equals(queueType)
+ && queueExecutionAttributes.getQueueName() != null && queueExecutionAttributes.getQueueName().equals(queueName)) {
+ logger.info("Fetched attributes for executionArn: " + executionARN + " => " + queueExecutionAttributes);
+ return queueExecutionAttributes;
+ }
+ }
+
+ logger.info("No active executions found for queue_type: " + queueType + ", queue_name: " + queueName + " stateMachineARN: ");
+ return null;
+
+ } catch (Exception e) {
+ logger.error("Unexpected error in fetching sfn attributes for queue_type: " + queueType + ", queue_name: " + queueName);
+ throw e;
+ }
+ }
+
+
+ /**
+ * Stops an active execution of step-function associated with (queueType, queueName), if any.
+ *
+ * @param queueType queueType
+ * @param queueName queueName
+ *
+ * @throws Exception: If some glitch happens in stopping.
+ */
+ public void stopActiveExecutions(String queueType, String queueName) throws JsonProcessingException {
+
+ try {
+ List executionItemList = getAllActiveExecutionArns();
+
+ for(ExecutionListItem executionItem : executionItemList) {
+ String executionARN = executionItem.getExecutionArn();
+
+ DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest().withExecutionArn(executionARN);
+ DescribeExecutionResult describeExecutionResult = stepFunctionsClient.describeExecution(describeExecutionRequest);
+
+ String existingInputPayload = describeExecutionResult.getInput();
+ QueueExecutionAttributes queueExecutionAttributes = new ObjectMapper().readValue(existingInputPayload, ExecutionInputWrapper.class).getExecutionInput();
+
+ if(queueExecutionAttributes.getQueueType() != null && queueExecutionAttributes.getQueueType().equals(queueType)
+ && queueExecutionAttributes.getQueueName() != null && queueExecutionAttributes.getQueueName().equals(queueName)) {
+ logger.info("Stopping active execution: " + executionARN);
+
+ StopExecutionRequest stopRequest = new StopExecutionRequest().withExecutionArn(executionARN);
+ stepFunctionsClient.stopExecution(stopRequest);
+
+ logger.info("Stopped execution: " + executionARN);
+ return;
+ }
+ }
+
+ logger.info("No active execution arn exists for queue_type:" + queueType + ", queue_name:" + queueName);
+
+ } catch (Exception e) {
+ logger.error("Failure in stopping active execution: {}", e.getMessage(), e);
+ throw e;
+ }
+ }
+
+
+ /**
+ * Gets execution ARN of an active/running step-function associated with (queueType, queueName).
+ *
+ * @return String : if any active execution exists, else NULL.
+ */
+ public List getAllActiveExecutionArns() {
+
+ try {
+ String stateMachineArn = getStateMachineARN();
+
+ ListExecutionsRequest listExecutionRequest = new ListExecutionsRequest().withStateMachineArn(stateMachineArn)
+ .withStatusFilter(ExecutionStatus.RUNNING);
+
+ ListExecutionsResult listExecutionResults = stepFunctionsClient.listExecutions(listExecutionRequest);
+ return listExecutionResults.getExecutions();
+
+ } catch (Exception e) {
+ logger.error("Unexpected error: {" + e.getMessage() + "} occurred while fetching all active execution ARNs", e);
+ throw e;
+ }
+
+ }
+
+
+ /**
+ * Gets stateMachine ARN of a step-function from aws parameter-store.
+ *
+ * @return String: stateMachineArn
+ *
+ * @throws AWSStepFunctionsException: If some glitch happens at aws end
+ * @throws MissingResourceException: If state machine arn is not found/set in aws parameter store
+ */
+ public String getStateMachineARN() {
+
+ try {
+ // TODO_SHAN: Extend this fetch part later based on queueType : queue/dedup/databus
+ String stateMachineArn = _parameterStoreUtil.getParameter("/" + universe + "/emodb/stepfn/stateMachineArn");
+
+ if(stateMachineArn != null && !stateMachineArn.isEmpty()) {
+ return stateMachineArn;
+ }
+ } catch (Exception e) {
+ throw new AWSStepFunctionsException("Problem fetching state machine arn");
+ }
+
+ throw new MissingResourceException("state machine arn not found in param-store", "", "");
+ }
+
+
+ private void syncFreshAttributesFromExistingExecution(QueueExecutionAttributes newQueueExecutionAttributes, QueueExecutionAttributes existingExecutionAttributes) {
+
+ validateExecutionInputs(existingExecutionAttributes.getQueueType(), existingExecutionAttributes.getQueueName(), existingExecutionAttributes);
+
+ if(newQueueExecutionAttributes == null) {
+ newQueueExecutionAttributes = new QueueExecutionAttributes();
+ }
+
+ if(newQueueExecutionAttributes.getQueueType() == null || newQueueExecutionAttributes.getQueueType().isEmpty()) {
+ newQueueExecutionAttributes.setQueueType(existingExecutionAttributes.getQueueType());
+ }
+
+ if(newQueueExecutionAttributes.getQueueName() == null || newQueueExecutionAttributes.getQueueName().isEmpty()) {
+ newQueueExecutionAttributes.setQueueName(existingExecutionAttributes.getQueueName());
+ }
+
+ if(newQueueExecutionAttributes.getQueueThreshold() == null) {
+ newQueueExecutionAttributes.setQueueThreshold(existingExecutionAttributes.getQueueThreshold());
+ }
+
+ if(newQueueExecutionAttributes.getBatchSize() == null) {
+ newQueueExecutionAttributes.setBatchSize(existingExecutionAttributes.getBatchSize());
+ }
+
+ if(newQueueExecutionAttributes.getInterval() == null) {
+ newQueueExecutionAttributes.setInterval(existingExecutionAttributes.getInterval());
+ }
+
+ if(newQueueExecutionAttributes.getTopicName() == null || newQueueExecutionAttributes.getTopicName().isEmpty()) {
+ newQueueExecutionAttributes.setTopicName(existingExecutionAttributes.getTopicName());
+ }
+
+ if(newQueueExecutionAttributes.getStatus() == null || newQueueExecutionAttributes.getStatus().isEmpty()) {
+ newQueueExecutionAttributes.setStatus(existingExecutionAttributes.getStatus());
+ }
+
+ validateExecutionInputs(newQueueExecutionAttributes.getQueueType(), newQueueExecutionAttributes.getQueueName(), newQueueExecutionAttributes);
+
+ }
+
+ private void validateExecutionInputs(String queueType, String queueName, QueueExecutionAttributes executionAttributes) {
+ if(queueName == null || queueName.isEmpty()) {
+ throw new IllegalArgumentException("queue name can't be null/empty");
+ }
+
+ if(queueType == null || queueType.isEmpty()) {
+ throw new IllegalArgumentException("queue type can't be null/empty");
+ }
+
+ if(executionAttributes == null) {
+ throw new IllegalArgumentException("execution attributes can't be null");
+ }
+
+ if(executionAttributes.getTopicName() == null || executionAttributes.getTopicName().isEmpty()) {
+ throw new IllegalArgumentException("topic name can't be null/empty");
+ }
+
+ if(executionAttributes.getInterval() == null) {
+ throw new IllegalArgumentException("interval can't be null");
+ }
+
+ if(executionAttributes.getBatchSize() == null) {
+ throw new IllegalArgumentException("batch size can't be null");
+ }
+
+ if(executionAttributes.getQueueThreshold() == null) {
+ throw new IllegalArgumentException("queue threshold can't be null");
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java
index 9263d33ef4..98b4d5870d 100644
--- a/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java
+++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/QueueModuleTest.java
@@ -13,6 +13,9 @@
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.queue.api.DedupQueueService;
import com.bazaarvoice.emodb.queue.api.QueueService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.bazaarvoice.ostrich.HostDiscovery;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
index e6f66a5c0f..0ac9a0ed89 100644
--- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
+++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
@@ -4,6 +4,10 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.job.api.JobType;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;
import java.time.Clock;
@@ -37,7 +41,7 @@ public void testSizeCache() {
BaseEventStore mockEventStore = mock(BaseEventStore.class);
AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class),
- mock(JobHandlerRegistry.class), mock(JobType.class), clock){};
+ mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){};
// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000
diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java
new file mode 100644
index 0000000000..136f8d5db4
--- /dev/null
+++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionServiceTest.java
@@ -0,0 +1,199 @@
+package com.bazaarvoice.emodb.queue.core.stepfn;
+
+import com.amazonaws.services.stepfunctions.AWSStepFunctions;
+import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
+import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+public class StepFunctionServiceTest {
+
+ private StepFunctionService stepFunctionService;
+
+ @Mock
+ private AWSStepFunctions mockStepFunctionsClient;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ MockitoAnnotations.openMocks(this);
+ stepFunctionService = new StepFunctionService();
+
+ // Use reflection to set the private field stepFunctionsClient
+ Field field = StepFunctionService.class.getDeclaredField("stepFunctionsClient");
+ field.setAccessible(true); // Make the private field accessible
+ field.set(stepFunctionService, mockStepFunctionsClient); // Inject mock
+ }
+
+ @Test
+ public void testStartExecution_withValidParameters() {
+ // Arrange
+ String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine";
+ String inputPayload = "{\"key\":\"value\"}";
+ String executionName = "testExecution";
+
+ StartExecutionResult mockResult = new StartExecutionResult()
+ .withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution");
+ when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult);
+
+ // Act
+ stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
+
+ // Assert
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class);
+ verify(mockStepFunctionsClient).startExecution(requestCaptor.capture());
+
+ StartExecutionRequest request = requestCaptor.getValue();
+ assertEquals(request.getStateMachineArn(), stateMachineArn);
+ assertEquals(request.getInput(), inputPayload);
+ //assertTrue(request.getName().startsWith("testExecution_"));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty")
+ public void testStartExecution_withNullStateMachineArn() {
+ // Arrange
+ String stateMachineArn = null;
+ String inputPayload = "{\"key\":\"value\"}";
+ String executionName = "testExecution";
+
+ // Act
+ stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "State Machine ARN cannot be null or empty")
+ public void testStartExecution_withEmptyStateMachineArn() {
+ // Arrange
+ String stateMachineArn = "";
+ String inputPayload = "{\"key\":\"value\"}";
+ String executionName = "testExecution";
+
+ // Act
+ stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
+ }
+
+ @Test
+ public void testStartExecution_withNullInputPayload() {
+ // Arrange
+ String stateMachineArn = "arn:aws:states:us-east-1:123456789012:stateMachine:exampleStateMachine";
+ String executionName = "testExecution";
+
+ StartExecutionResult mockResult = new StartExecutionResult()
+ .withExecutionArn("arn:aws:states:us-east-1:123456789012:execution:exampleExecution");
+ when(mockStepFunctionsClient.startExecution(any(StartExecutionRequest.class))).thenReturn(mockResult);
+
+ // Act
+ stepFunctionService.startExecution(stateMachineArn, null, executionName);
+
+ // Assert
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(StartExecutionRequest.class);
+ verify(mockStepFunctionsClient).startExecution(requestCaptor.capture());
+
+ StartExecutionRequest request = requestCaptor.getValue();
+ assertEquals(request.getStateMachineArn(), stateMachineArn);
+ assertEquals(request.getInput(), "{}"); // Default to empty payload
+ }
+
+ @Test
+ public void testSanitizeExecutionName_withInvalidCharacters() {
+ // Arrange
+ String invalidExecutionName = "test/execution:name*with?invalid|characters";
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(invalidExecutionName);
+
+ // Assert
+ assertEquals(sanitized, "test_execution_name_with_invalid_characters");
+ }
+
+ @Test
+ public void testSanitizeExecutionName_withTooLongName() {
+ // Arrange
+ String longExecutionName = "ThisIsAVeryLongExecutionNameThatExceedsTheMaximumAllowedLengthOfSixtyNineCharactersAndShouldBeTruncatedAtSomePoint";
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(longExecutionName);
+
+ // Assert
+ assertTrue(sanitized.length() <= 69);
+ }
+
+ // New Test Cases for Edge Cases
+
+ @Test
+ public void testSanitizeExecutionName_withValidName() {
+ // Arrange
+ String validExecutionName = "validExecutionName";
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(validExecutionName);
+
+ // Print the output
+ System.out.println("Sanitized Execution Name: " + sanitized);
+
+ // Assert
+ assertEquals(sanitized, validExecutionName); // Should return the same name
+ }
+
+ @Test
+ public void testSanitizeExecutionName_withLeadingAndTrailingSpaces() {
+ // Arrange
+ String executionName = " executionName ";
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(executionName);
+
+ // Print the output
+ System.out.println("Sanitized Execution Name: " + sanitized);
+
+ // Assert
+ assertEquals(sanitized, "executionName"); // Should trim spaces
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Execution name cannot contain only invalid characters")
+ public void testSanitizeExecutionName_withOnlyInvalidCharacters() {
+ // Arrange
+ String invalidOnly = "*/?|<>"; // Input with only invalid characters
+
+ stepFunctionService.sanitizeExecutionName(invalidOnly);
+ }
+
+
+ @Test
+ public void testSanitizeExecutionName_withMaximumLength() {
+ // Arrange
+ String maxLengthExecutionName = "ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEDHDFHDFHHFCN"; // 69 characters
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(maxLengthExecutionName);
+
+ // Print the output
+ System.out.println("Sanitized Execution Name: " + sanitized);
+
+ // Assert
+ assertEquals(sanitized.length(), 66); // Should be exactly 66 characters
+ }
+
+ @Test
+ public void testSanitizeExecutionName_withMultipleInvalidCharacters() {
+ // Arrange
+ String executionName = "test//?invalid//name?with*multiple|invalid:characters";
+
+ // Act
+ String sanitized = stepFunctionService.sanitizeExecutionName(executionName);
+
+ // Print the output
+ System.out.println("Sanitized Execution Name: " + sanitized);
+
+ // Assert
+ assertEquals(sanitized, "test___invalid__name_with_multiple_invalid_characters"); // Should replace all invalid characters
+ }
+}
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d646097d3b..81b0ee0b03 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/sor-api/pom.xml b/sor-api/pom.xml
index 7888699c32..81e16753c4 100644
--- a/sor-api/pom.xml
+++ b/sor-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java
index 7a00eedf9f..3e7676659c 100644
--- a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java
+++ b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/DataStore.java
@@ -261,4 +261,11 @@ void dropFacade(String table, String placement, Audit audit)
*/
URI getStashRoot()
throws StashNotAvailableException;
+
+ /*
+ * This method is a no-op in the default implementation. It is used by the Databus to update the reference
+ */
+ default void updateRefInDatabus(List updateRefs) {
+
+ }
}
diff --git a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java
index 34d020b1fe..9fb41535a9 100644
--- a/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java
+++ b/sor-api/src/main/java/com/bazaarvoice/emodb/sor/api/Names.java
@@ -19,4 +19,6 @@ public static boolean isLegalTableAttributeName(String attributeName) {
// The attributes should not start with "~" which is reserved for Emodb's internal use
return !attributeName.startsWith("~");
}
+
+
}
diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml
index 30389fd75f..cf1a7df1b5 100644
--- a/sor-client-common/pom.xml
+++ b/sor-client-common/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml
index 406e66feed..15f4ca4283 100644
--- a/sor-client-jersey2/pom.xml
+++ b/sor-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/sor-client/pom.xml b/sor-client/pom.xml
index 8e2ce4803e..9493c37ff1 100644
--- a/sor-client/pom.xml
+++ b/sor-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/sor/pom.xml b/sor/pom.xml
index 18b6158eab..fb2ffa746c 100644
--- a/sor/pom.xml
+++ b/sor/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
@@ -348,5 +348,13 @@
testng
test
+
+ com.amazonaws
+ aws-java-sdk-ssm
+
+
+ org.apache.kafka
+ kafka-clients
+
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java
index fda127a190..6c3958b265 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/DataStoreModule.java
@@ -60,6 +60,7 @@
import com.bazaarvoice.emodb.sor.db.cql.CqlForMultiGets;
import com.bazaarvoice.emodb.sor.db.cql.CqlForScans;
import com.bazaarvoice.emodb.sor.db.cql.SorCqlSettingsTask;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.LogbackSlowQueryLogProvider;
import com.bazaarvoice.emodb.sor.log.SlowQueryLog;
import com.bazaarvoice.emodb.sor.log.SlowQueryLogConfiguration;
@@ -195,6 +196,7 @@ protected void configure() {
bind(SlowQueryLog.class).toProvider(LogbackSlowQueryLogProvider.class);
bind(HintsConsistencyTimeProvider.class).asEagerSingleton();
bind(MinLagConsistencyTimeProvider.class).asEagerSingleton();
+ bind(KafkaProducerService.class).asEagerSingleton();
// The web servers are responsible for updating the ZooKeeper full consistency data. CLI tools don't need to.
// Enable updating the ZooKeeper full consistency data if specified
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DataStoreProviderProxy.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DataStoreProviderProxy.java
index 607b27d2de..bc67e58e78 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DataStoreProviderProxy.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DataStoreProviderProxy.java
@@ -208,4 +208,9 @@ public void dropFacade(String table, String placement, Audit audit) throws Unkno
public URI getStashRoot() throws StashNotAvailableException {
return _local.get().getStashRoot();
}
+
+ @Override
+ public void updateRefInDatabus(List updateRefs) {
+ _local.get().updateRefInDatabus(updateRefs);
+ }
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java
index dadbdd3b4e..5cb454b161 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/DefaultDataStore.java
@@ -5,29 +5,7 @@
import com.bazaarvoice.emodb.common.json.deferred.LazyJsonMap;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.common.zookeeper.store.MapStore;
-import com.bazaarvoice.emodb.sor.api.Audit;
-import com.bazaarvoice.emodb.sor.api.AuditBuilder;
-import com.bazaarvoice.emodb.sor.api.AuditsUnavailableException;
-import com.bazaarvoice.emodb.sor.api.Change;
-import com.bazaarvoice.emodb.sor.api.CompactionControlSource;
-import com.bazaarvoice.emodb.sor.api.Coordinate;
-import com.bazaarvoice.emodb.sor.api.DataStore;
-import com.bazaarvoice.emodb.sor.api.DefaultTable;
-import com.bazaarvoice.emodb.sor.api.FacadeOptions;
-import com.bazaarvoice.emodb.sor.api.History;
-import com.bazaarvoice.emodb.sor.api.Intrinsic;
-import com.bazaarvoice.emodb.sor.api.Names;
-import com.bazaarvoice.emodb.sor.api.ReadConsistency;
-import com.bazaarvoice.emodb.sor.api.StashNotAvailableException;
-import com.bazaarvoice.emodb.sor.api.StashRunTimeInfo;
-import com.bazaarvoice.emodb.sor.api.StashTimeKey;
-import com.bazaarvoice.emodb.sor.api.TableOptions;
-import com.bazaarvoice.emodb.sor.api.UnknownPlacementException;
-import com.bazaarvoice.emodb.sor.api.UnknownTableException;
-import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEvent;
-import com.bazaarvoice.emodb.sor.api.UnpublishedDatabusEventType;
-import com.bazaarvoice.emodb.sor.api.Update;
-import com.bazaarvoice.emodb.sor.api.WriteConsistency;
+import com.bazaarvoice.emodb.sor.api.*;
import com.bazaarvoice.emodb.sor.audit.AuditWriter;
import com.bazaarvoice.emodb.sor.compactioncontrol.LocalCompactionControl;
import com.bazaarvoice.emodb.sor.condition.Condition;
@@ -42,7 +20,10 @@
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.sor.db.ScanRangeSplits;
import com.bazaarvoice.emodb.sor.delta.Delta;
+import com.bazaarvoice.emodb.sor.kafka.KafkaConfig;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.SlowQueryLog;
+import com.bazaarvoice.emodb.sor.ssm.ParameterStoreUtil;
import com.bazaarvoice.emodb.table.db.DroppedTableException;
import com.bazaarvoice.emodb.table.db.StashBlackListTableCondition;
import com.bazaarvoice.emodb.table.db.StashTableDAO;
@@ -55,10 +36,14 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
@@ -79,21 +64,14 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -104,6 +82,8 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab
private static final int NUM_COMPACTION_THREADS = 2;
private static final int MAX_COMPACTION_QUEUE_LENGTH = 100;
+ private static final String MASTER_FANOUT_TOPIC = "system_bus_master";
+ private static final String DATA_THROTTLER = "databusThrottler";
private final Logger _log = LoggerFactory.getLogger(DefaultDataStore.class);
@@ -126,6 +106,11 @@ public class DefaultDataStore implements DataStore, DataProvider, DataTools, Tab
private final CompactionControlSource _compactionControlSource;
private final MapStore _minSplitSizeMap;
private final Clock _clock;
+ private final KafkaProducerService _kafkaProducerService;
+ private ParameterStoreUtil parameterStoreUtil;
+ private final Cache dataThrottlerCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
private StashTableDAO _stashTableDao;
@@ -134,10 +119,10 @@ public DefaultDataStore(LifeCycleRegistry lifeCycle, MetricRegistry metricRegist
DataReaderDAO dataReaderDao, DataWriterDAO dataWriterDao, SlowQueryLog slowQueryLog, HistoryStore historyStore,
@StashRoot Optional stashRootDirectory, @LocalCompactionControl CompactionControlSource compactionControlSource,
@StashBlackListTableCondition Condition stashBlackListTableCondition, AuditWriter auditWriter,
- @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock) {
+ @MinSplitSizeMap MapStore minSplitSizeMap, Clock clock, KafkaProducerService kafkaProducerService) {
this(eventWriterRegistry, tableDao, dataReaderDao, dataWriterDao, slowQueryLog, defaultCompactionExecutor(lifeCycle),
historyStore, stashRootDirectory, compactionControlSource, stashBlackListTableCondition, auditWriter,
- minSplitSizeMap, metricRegistry, clock);
+ minSplitSizeMap, metricRegistry, clock, kafkaProducerService);
}
@VisibleForTesting
@@ -146,7 +131,7 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO
SlowQueryLog slowQueryLog, ExecutorService compactionExecutor, HistoryStore historyStore,
Optional stashRootDirectory, CompactionControlSource compactionControlSource,
Condition stashBlackListTableCondition, AuditWriter auditWriter,
- MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock) {
+ MapStore minSplitSizeMap, MetricRegistry metricRegistry, Clock clock, KafkaProducerService kafkaProducerService) {
_eventWriterRegistry = requireNonNull(eventWriterRegistry, "eventWriterRegistry");
_tableDao = requireNonNull(tableDao, "tableDao");
_dataReaderDao = requireNonNull(dataReaderDao, "dataReaderDao");
@@ -166,6 +151,8 @@ public DefaultDataStore(DatabusEventWriterRegistry eventWriterRegistry,TableDAO
_compactionControlSource = requireNonNull(compactionControlSource, "compactionControlSource");
_minSplitSizeMap = requireNonNull(minSplitSizeMap, "minSplitSizeMap");
_clock = requireNonNull(clock, "clock");
+ _kafkaProducerService = requireNonNull(kafkaProducerService, "kafkaProducerService");
+ this.parameterStoreUtil = new ParameterStoreUtil();
}
/**
@@ -376,6 +363,35 @@ public AnnotatedContent apply(Record record) {
};
}
+ /**
+ * Retrieves the value of the "DataThrottler" flag from the cache if available.
+ * If the value is not present in the cache or the cache has expired, it fetches the value
+ * from AWS Parameter Store and stores it in the cache.
+ *
+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed
+ * from the Parameter Store on the next access.
+ *
+ *
+ * @return {@code true} if the experiment is still running, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store.
+ */
+ private boolean getDataThrottlerValue() {
+ try {
+ String UNIVERSE = KafkaConfig.getUniverseFromEnv();
+ // Attempt to retrieve from cache
+ return dataThrottlerCache.get(DATA_THROTTLER, () -> {
+
+ Boolean checkDataThrottler = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/" + DATA_THROTTLER));
+ _log.info("DATA_THROTTLER is refreshed {}", checkDataThrottler);
+ // If absent or expired, fetch from Parameter Store and cache the result
+ return checkDataThrottler;
+ });
+ } catch (Exception e) {
+ _log.error("Error fetching databusThrottler valie{}", e.getMessage());
+ return false;
+ }
+ }
+
/**
* Resolve a set of changes read from the {@link DataWriterDAO} into a single JSON literal object + metadata.
* If the record can be compacted an asynchronous compaction will be scheduled unless
@@ -744,7 +760,10 @@ public void beforeWrite(Collection updateBatch) {
}
}
if (!updateRefs.isEmpty()) {
- _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs);
+ if(getDataThrottlerValue())
+ _kafkaProducerService.sendMessages(MASTER_FANOUT_TOPIC, updateRefs, "update");
+ else
+ _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefs);
}
}
@@ -1025,4 +1044,26 @@ private void decrementDeltaSizes(PendingCompaction pendingCompaction) {
private String getMetricName(String name) {
return MetricRegistry.name("bv.emodb.sor", "DefaultDataStore", name);
}
+
+ @Override
+ public void updateRefInDatabus(List updateRefsModel) {
+ try {
+ List updateRefModelList = updateRefsModel.stream()
+ .map(string -> {
+ try {
+ return new ObjectMapper().readValue(string, UpdateRef.class);
+ } catch (JsonProcessingException e) {
+ _log.error("Error In Parsing The Message for {}" , string, e);
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ if(!updateRefModelList.isEmpty()){
+ _eventWriterRegistry.getDatabusWriter().writeEvents(updateRefModelList);
+ _log.info("Successfully wrote {} number of msgs to databus", updateRefModelList.size());
+ }
+ } catch (Exception e) {
+ _log.info("Error in writing updateRef to databus ", e);
+ }
+ }
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/UpdateRef.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/UpdateRef.java
index a7cb0fc008..7e78d9646e 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/UpdateRef.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/UpdateRef.java
@@ -1,8 +1,12 @@
package com.bazaarvoice.emodb.sor.core;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import static java.util.Objects.hash;
import static java.util.Objects.requireNonNull;
@@ -17,7 +21,9 @@ public final class UpdateRef {
private final UUID _changeId;
private final Set _tags;
- public UpdateRef(String table, String key, UUID changeId, Set tags) {
+
+ @JsonCreator
+ public UpdateRef(@JsonProperty("table") String table,@JsonProperty("key") String key,@JsonProperty("changeId") UUID changeId,@JsonProperty("tags") Set tags) {
_table = requireNonNull(table, "table");
_key = requireNonNull(key, "key");
_changeId = requireNonNull(changeId, "changeId");
@@ -60,4 +66,13 @@ public int hashCode() {
return hash(_table, _key, _changeId, _tags);
}
+ @Override
+ public String toString() {
+ return "{" +
+ "\"table\":\"" + _table + "\"" +
+ ",\"key\":\"" + _key + "\"" +
+ ",\"changeId\":\"" + _changeId +"\""+
+ ",\"tags\":" + _tags.stream() .map(item -> "\"" + item + "\"") .collect(Collectors.toSet()) +
+ "}";
+ }
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/WriteCloseableDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/WriteCloseableDataStore.java
index 784e2c5fba..adcba12d6f 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/WriteCloseableDataStore.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/WriteCloseableDataStore.java
@@ -297,4 +297,9 @@ public Collection getTablePlacements() {
public URI getStashRoot() throws StashNotAvailableException {
return _delegate.getStashRoot();
}
+
+ @Override
+ public void updateRefInDatabus(List updateRefsModel) {
+ _delegate.updateRefInDatabus(updateRefsModel);
+ }
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
index 37348e976b..8f6d9da161 100644
--- a/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/core/test/InMemoryDataStore.java
@@ -6,6 +6,7 @@
import com.bazaarvoice.emodb.sor.core.DatabusEventWriterRegistry;
import com.bazaarvoice.emodb.sor.core.DefaultDataStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
import com.codahale.metrics.MetricRegistry;
@@ -19,18 +20,19 @@
*/
public class InMemoryDataStore extends DefaultDataStore {
- public InMemoryDataStore(MetricRegistry metricRegistry) {
- this(new InMemoryDataReaderDAO(), metricRegistry);
+ public InMemoryDataStore(MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
+ this(new InMemoryDataReaderDAO(), metricRegistry, kafkaProducerService);
}
- public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) {
- this(new DatabusEventWriterRegistry(), dataDao, metricRegistry);
+
+ public InMemoryDataStore(InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
+ this(new DatabusEventWriterRegistry(), dataDao, metricRegistry, kafkaProducerService);
}
- public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry) {
+ public InMemoryDataStore(DatabusEventWriterRegistry eventWriterRegistry, InMemoryDataReaderDAO dataDao, MetricRegistry metricRegistry, KafkaProducerService kafkaProducerService) {
super(eventWriterRegistry, new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), kafkaProducerService);
}
}
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java
new file mode 100644
index 0000000000..a6518246b2
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaConfig.java
@@ -0,0 +1,148 @@
+package com.bazaarvoice.emodb.sor.kafka;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.Parameter;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class KafkaConfig {
+ private static String bootstrapServersConfig;
+ private static String batchSizeConfig;
+ private static String retriesConfig;
+ private static String lingerMsConfig;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+ // Static SSM Client and configuration using AWS SDK v1
+ private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder
+ .standard()
+ .build();
+
+
+ static {
+ try {
+ final String UNIVERSE = getUniverseFromEnv();
+ // Load configurations from SSM during static initialization
+ Map parameterValues = getParameterValues(
+ Arrays.asList(
+ "/" + UNIVERSE + "/emodb/kafka/batchSize",
+ "/" + UNIVERSE + "/emodb/kafka/retries",
+ "/" + UNIVERSE + "/emodb/kafka/lingerMs",
+ "/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
+ )
+ );
+
+ // Set configurations with fallback to defaults if not present
+ // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
+ batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
+
+ // Sets the number of retry attempts for failed Kafka message sends.
+ retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
+
+ // Sets the number of milliseconds a producer is willing to wait before sending a batch out
+ lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
+
+ // Configures the Kafka broker addresses for producer connections.
+ bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", "localhost:9092");
+
+ logger.info("Kafka configurations loaded successfully from SSM.");
+ } catch (AmazonServiceException e) {
+ logger.error("Failed to load configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ catch (Exception e) {
+ logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ }
+
+ public static String getUniverseFromEnv() {
+ String filePath = "/etc/environment";
+ logger.info("Reading environment file: " + filePath);
+ Properties environmentProps = new Properties();
+
+ try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Skip empty lines or comments
+ if (line.trim().isEmpty() || line.trim().startsWith("#")) {
+ continue;
+ }
+ // Split the line into key-value pair
+ String[] parts = line.split("=", 2);
+ logger.info("parts: " + Arrays.toString(parts));
+ if (parts.length == 2) {
+ String key = parts[0].trim();
+ String value = parts[1].trim();
+ // Remove any surrounding quotes from value
+ value = value.replace("\"", "");
+ environmentProps.put(key, value);
+ }
+ }
+ // Access the environment variables
+ return environmentProps.getProperty("UNIVERSE");
+ } catch (IOException e) {
+ logger.error("Error reading environment file: " + e.getMessage());
+ throw new RuntimeException("Error reading environment file: " + e.getMessage());
+ }
+ }
+ // Fetch parameters from AWS SSM using AWS SDK v1
+ private static Map getParameterValues(List parameterNames) {
+ try {
+ GetParametersRequest request = new GetParametersRequest()
+ .withNames(parameterNames)
+ .withWithDecryption(true);
+
+ GetParametersResult response = ssmClient.getParameters(request);
+
+ return response.getParameters().stream()
+ .collect(Collectors.toMap(Parameter::getName, Parameter::getValue));
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from SSM.", e);
+ throw e; // Rethrow or handle the exception if necessary
+ }
+ }
+
+ // Kafka Producer properties
+ public static Properties getProducerProps() {
+ Properties producerProps = new Properties();
+
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig));
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig));
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig));
+ producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting
+ logger.info("Kafka Producer properties initialized.");
+ return producerProps;
+ }
+
+ // Ensure the SSM client is closed when the application shuts down
+ public static void shutdown() {
+ if (ssmClient != null) {
+ try {
+ ssmClient.shutdown();
+ logger.info("SSM client closed successfully.");
+ } catch (Exception e) {
+ logger.error("Error while closing SSM client.", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java
new file mode 100644
index 0000000000..4cb587cf29
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/kafka/KafkaProducerService.java
@@ -0,0 +1,66 @@
+package com.bazaarvoice.emodb.sor.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public class KafkaProducerService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class);
+ private final KafkaProducer producer; // Changed to String
+
+ public KafkaProducerService() {
+ this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps());
+ _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps());
+ }
+
+ /**
+ * Sends each message from the collection to the specified Kafka topic separately.
+ *
+ * @param topic The Kafka topic.
+ * @param events The collection of messages to be sent.
+ */
+ public void sendMessages(String topic, Collection events, String queueType) {
+ LocalDateTime startTime = LocalDateTime.now();
+ _log.info("Sending {} messages to topic '{}'", events.size(), topic);
+ List> futures = new ArrayList<>();
+ // Use async sendMessage and collect futures
+ for (T event : events) {
+ futures.add(producer.send(new ProducerRecord<>(topic, event.toString())));
+ }
+
+ // Wait for all futures to complete
+ for (Future future : futures) {
+ try {
+ future.get(); // Only blocks if a future is not yet complete
+ } catch (Exception e) {
+ _log.error("Error while sending message to Kafka: {}", e.getMessage());
+ throw new RuntimeException("Error sending messages to Kafka", e);
+ }
+ }
+ _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime, LocalDateTime.now()).toMillis());
+ }
+
+
+ /**
+ * Closes the producer to release resources.
+ */
+ public void close() {
+ _log.info("Closing Kafka producer.");
+ try {
+ producer.flush();
+ producer.close();
+ } catch (Exception e) {
+ _log.error("Error while closing Kafka producer: ", e);
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java
new file mode 100644
index 0000000000..693901b264
--- /dev/null
+++ b/sor/src/main/java/com/bazaarvoice/emodb/sor/ssm/ParameterStoreUtil.java
@@ -0,0 +1,120 @@
+package com.bazaarvoice.emodb.sor.ssm;
+
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for interacting with AWS Parameter Store using AWS SDK v1.
+ */
+public class ParameterStoreUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class);
+ private final AWSSimpleSystemsManagement ssmClient;
+
+ /**
+ * Constructor to initialize the SSM client
+ */
+ public ParameterStoreUtil() {
+ // Create SSM client with default credentials and region
+ ssmClient = AWSSimpleSystemsManagementClientBuilder.standard()
+ .build();
+ }
+
+ /**
+ * Fetches a parameter from AWS Parameter Store.
+ *
+ * @param parameterName The name of the parameter to fetch
+ * @return The value of the parameter
+ * @throws IllegalArgumentException If the parameterName is null or empty
+ */
+ public String getParameter(String parameterName) {
+ if (parameterName == null || parameterName.isEmpty()) {
+ logger.error("Parameter name cannot be null or empty");
+ throw new IllegalArgumentException("Parameter name cannot be null or empty");
+ }
+
+ try {
+
+ GetParameterRequest request = new GetParameterRequest().withName(parameterName);
+ GetParameterResult result = ssmClient.getParameter(request);
+ return result.getParameter().getValue();
+
+ } catch (ParameterNotFoundException e) {
+ logger.error("Parameter not found: {}", parameterName, e);
+ throw new RuntimeException("Parameter not found: " + parameterName, e);
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameter: {}", parameterName, e);
+ throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e);
+ }
+ }
+
+ /**
+ * Fetches multiple parameters from AWS Parameter Store in a batch.
+ *
+ * @param parameterNames The list of parameter names to fetch
+ * @return A map of parameter names to their values
+ * @throws IllegalArgumentException If the parameterNames list is null or empty
+ */
+ public Map getParameters(List parameterNames) {
+ if (parameterNames == null || parameterNames.isEmpty()) {
+ logger.error("Parameter names list cannot be null or empty");
+ throw new IllegalArgumentException("Parameter names list cannot be null or empty");
+ }
+
+ try {
+
+ GetParametersRequest request = new GetParametersRequest().withNames(parameterNames);
+ GetParametersResult result = ssmClient.getParameters(request);
+
+ // Map the result to a Map of parameter names and values
+ Map parameters = new HashMap<>();
+ result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue()));
+
+ // Log any parameters that were not found
+ if (!result.getInvalidParameters().isEmpty()) {
+ logger.warn("The following parameters were not found: {}", result.getInvalidParameters());
+ }
+
+ return parameters;
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameters: {}", parameterNames, e);
+ throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e);
+ }
+ }
+
+ public Long updateParameter(String key, String value) {
+ try {
+ if (key == null || key.trim().isEmpty()) {
+ logger.error("parameter name cannot be null or blank");
+ throw new IllegalArgumentException("parameter name cannot be null or blank");
+ }
+
+ PutParameterRequest request = new PutParameterRequest().withName(key).withValue(value).withOverwrite(true);
+
+ PutParameterResult response = ssmClient.putParameter(request);
+ logger.info("Successfully updated parameter: " + key + " with value: " + value + ", Update Version: " + response.getVersion());
+ return response.getVersion();
+ } catch (Exception e) {
+ logger.error("Failed to update parameter: " + key + " with value: " + value, e);
+ throw new RuntimeException("Unexpected error updating parameter: " + key + " with value: " + value, e);
+ }
+ }
+
+}
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
index 48da779c69..442de20665 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/CompactorTest.java
@@ -16,6 +16,7 @@
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.table.db.Table;
@@ -485,7 +486,7 @@ public void compact(Table table, String key, UUID compactionKey, Compaction comp
}
};
- final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry());
+ final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
dataStore.createTable(tableName,
@@ -571,7 +572,7 @@ public Record read(Key key, ReadConsistency ignored) {
// Configure the data DAO to read 10 columns initially, causing other column reads to be read lazily
dataDAO.setColumnBatchSize(10);
- final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry());
+ final DataStore dataStore = new InMemoryDataStore(dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
dataStore.createTable(tableName,
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
index 3cf9b7b50f..bb66c772fd 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/DataStoreTest.java
@@ -14,6 +14,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.codahale.metrics.MetricRegistry;
@@ -33,6 +34,7 @@
import java.util.Set;
import java.util.UUID;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -47,7 +49,7 @@ public class DataStoreTest {
@Test
public void testDeltas() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
@@ -167,7 +169,7 @@ public void testDeltas() throws Exception {
@Test
public void testRecordTimestamps() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
@@ -262,7 +264,7 @@ record = store.get(TABLE, KEY1);
@Test
public void testRecordTimestampsWithEventTags() throws Exception {
- DataStore store = new InMemoryDataStore(new MetricRegistry());
+ DataStore store = new InMemoryDataStore(new MetricRegistry(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
assertFalse(store.getTableExists(TABLE));
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
index ac585fa220..6030144919 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/MinSplitSizeTest.java
@@ -10,6 +10,7 @@
import com.bazaarvoice.emodb.sor.db.astyanax.ChangeEncoder;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.PlacementCache;
@@ -43,7 +44,7 @@ public List getSplits(Table table, int recordsPerSplit, int localResplit
}
};
- DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry());
+ DataStore dataStore = new InMemoryDataStore(dataDao, new MetricRegistry(), mock(KafkaProducerService.class));
dataStore.createTable("table", new TableOptionsBuilder().setPlacement("default").build(),
Collections.emptyMap(), new AuditBuilder().build());
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
index 7377838dc5..36bdcdf8e4 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/RedundantDeltaTest.java
@@ -24,6 +24,7 @@
import com.bazaarvoice.emodb.sor.db.test.DeltaClusteringKey;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
@@ -65,7 +66,7 @@ public void testRedundantDeltas() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -122,7 +123,7 @@ public void testMinUUIDDelta() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -159,7 +160,7 @@ public void testRedundancyWithTags() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -240,7 +241,7 @@ public void testTagsForNestedMapDeltas() {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -260,7 +261,7 @@ public void testRedundancyWithCompactionAndUnchangedTag() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), new InMemoryTableDAO(), dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -337,7 +338,7 @@ public void testPartialCompactionWithNoRedundancy() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
@@ -409,7 +410,7 @@ public void testPartialCompactionWithRedundancy() throws Exception {
DefaultDataStore store = new DefaultDataStore(new DatabusEventWriterRegistry(), tableDao, dataDao, dataDao,
new NullSlowQueryLog(), new DiscardingExecutorService(), new InMemoryHistoryStore(),
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), new MetricRegistry(), Clock.systemUTC(), mock(KafkaProducerService.class));
TableOptions options = new TableOptionsBuilder().setPlacement("default").build();
store.createTable(TABLE, options, Collections.emptyMap(), newAudit("create table"));
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
index 60574f680c..047c373f72 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/core/SorUpdateTest.java
@@ -8,6 +8,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryDataStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
import com.bazaarvoice.emodb.sor.delta.Deltas;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.test.SystemClock;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
@@ -19,6 +20,7 @@
import java.util.UUID;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -34,7 +36,7 @@ public class SorUpdateTest {
public void SetupTest() {
final InMemoryDataReaderDAO dataDAO = new InMemoryDataReaderDAO();
_eventWriterRegistry = new DatabusEventWriterRegistry();
- _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry());
+ _dataStore = new InMemoryDataStore(_eventWriterRegistry, dataDAO, new MetricRegistry(), mock(KafkaProducerService.class));
// Create a table for our test
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
index c67f985342..66e4d5fe2b 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/sor/test/MultiDCDataStores.java
@@ -11,6 +11,7 @@
import com.bazaarvoice.emodb.sor.core.test.InMemoryHistoryStore;
import com.bazaarvoice.emodb.sor.core.test.InMemoryMapStore;
import com.bazaarvoice.emodb.sor.db.test.InMemoryDataReaderDAO;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.sor.log.NullSlowQueryLog;
import com.bazaarvoice.emodb.table.db.TableDAO;
import com.bazaarvoice.emodb.table.db.test.InMemoryTableDAO;
@@ -20,6 +21,8 @@
import java.time.Clock;
import java.util.Optional;
+import static org.mockito.Mockito.mock;
+
/**
* Wrapper around a set of {@link DataStore} instances that replicate to each other,
* simulating a set of eventually consistent data centers.
@@ -63,12 +66,12 @@ public MultiDCDataStores(int numDCs, boolean asyncCompacter, MetricRegistry metr
if (asyncCompacter) {
_stores[i] = new DefaultDataStore(new SimpleLifeCycleRegistry(), metricRegistry, new DatabusEventWriterRegistry(), _tableDao,
_inMemoryDaos[i].setHistoryStore(_historyStores[i]), _replDaos[i], new NullSlowQueryLog(), _historyStores[i],
- Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC());
+ Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(), new DiscardingAuditWriter(), new InMemoryMapStore<>(), Clock.systemUTC(), mock(KafkaProducerService.class));
} else {
_stores[i] = new DefaultDataStore(new DatabusEventWriterRegistry(), _tableDao, _inMemoryDaos[i].setHistoryStore(_historyStores[i]),
_replDaos[i], new NullSlowQueryLog(), MoreExecutors.newDirectExecutorService(), _historyStores[i],
Optional.empty(), new InMemoryCompactionControlSource(), Conditions.alwaysFalse(),
- new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC());
+ new DiscardingAuditWriter(), new InMemoryMapStore<>(), metricRegistry, Clock.systemUTC(), mock(KafkaProducerService.class));
}
}
}
diff --git a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
index 5ad5ff357f..f86ba00818 100644
--- a/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
+++ b/sor/src/test/java/com/bazaarvoice/emodb/table/db/astyanax/TableLifeCycleTest.java
@@ -27,6 +27,7 @@
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Deltas;
import com.bazaarvoice.emodb.sor.delta.MapDeltaBuilder;
+import com.bazaarvoice.emodb.sor.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.table.db.MoveType;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.TableBackingStore;
@@ -1981,7 +1982,7 @@ dataCenter, mock(RateLimiterCache.class), dataCopyDAO, dataPurgeDAO,
}
private InMemoryDataStore newBackingStore(MetricRegistry metricRegistry) {
- InMemoryDataStore store = new InMemoryDataStore(metricRegistry);
+ InMemoryDataStore store = new InMemoryDataStore(metricRegistry, mock(KafkaProducerService.class));
store.createTable("__system:table", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
store.createTable("__system:table_uuid", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
store.createTable("__system:table_unpublished_databus_events", newOptions(PL_GLOBAL), ImmutableMap.of(), newAudit());
diff --git a/table/pom.xml b/table/pom.xml
index 3179fd7ab6..bf8da54674 100644
--- a/table/pom.xml
+++ b/table/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/uac-api/pom.xml b/uac-api/pom.xml
index 70fe6fade6..b09909fc1d 100644
--- a/uac-api/pom.xml
+++ b/uac-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml
index a718815649..ec42f043a3 100644
--- a/uac-client-jersey2/pom.xml
+++ b/uac-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client/pom.xml b/uac-client/pom.xml
index 936a2ee1dc..e9dff54e69 100644
--- a/uac-client/pom.xml
+++ b/uac-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/web-local/pom.xml b/web-local/pom.xml
index 5b20530fd1..08dcb75859 100644
--- a/web-local/pom.xml
+++ b/web-local/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/web/pom.xml b/web/pom.xml
index 150e019232..2016f124be 100644
--- a/web/pom.xml
+++ b/web/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.73-SNAPSHOT
+ 6.5.215-SNAPSHOT
../parent/pom.xml
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java b/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java
index 8a15f6c970..48a6059960 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java
@@ -304,9 +304,9 @@ private void evaluateQueue()
// Start the Queue service
ResourceRegistry resources = _injector.getInstance(ResourceRegistry.class);
// Start the Queue service
- resources.addResource(_cluster, "emodb-queue-1", new QueueResource1(queueService, queueClient));
+ resources.addResource(_cluster, "emodb-queue-1", new QueueResource1(queueService, queueClient, _environment.metrics()));
// Start the Dedup Queue service
- resources.addResource(_cluster, "emodb-dedupq-1", new DedupQueueResource1(dedupQueueService, dedupQueueClient));
+ resources.addResource(_cluster, "emodb-dedupq-1", new DedupQueueResource1(dedupQueueService, dedupQueueClient, _environment.metrics()));
}
private void evaluateScanner()
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
index a3869e8214..b86b38943d 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
@@ -1,13 +1,9 @@
package com.bazaarvoice.emodb.web.resources.blob;
+import com.amazonaws.AmazonClientException;
import com.bazaarvoice.emodb.auth.jersey.Authenticated;
import com.bazaarvoice.emodb.auth.jersey.Subject;
-import com.bazaarvoice.emodb.blob.api.Blob;
-import com.bazaarvoice.emodb.blob.api.BlobMetadata;
-import com.bazaarvoice.emodb.blob.api.BlobStore;
-import com.bazaarvoice.emodb.blob.api.Range;
-import com.bazaarvoice.emodb.blob.api.RangeSpecification;
-import com.bazaarvoice.emodb.blob.api.Table;
+import com.bazaarvoice.emodb.blob.api.*;
import com.bazaarvoice.emodb.common.api.UnauthorizedException;
import com.bazaarvoice.emodb.common.json.LoggingIterator;
import com.bazaarvoice.emodb.sor.api.Audit;
@@ -17,6 +13,9 @@
import com.bazaarvoice.emodb.web.auth.resource.NamedResource;
import com.bazaarvoice.emodb.web.jersey.params.SecondsParam;
import com.bazaarvoice.emodb.web.resources.SuccessResponse;
+import com.bazaarvoice.emodb.web.resources.blob.messageQueue.MessagingService;
+import com.bazaarvoice.emodb.web.resources.blob.messageQueue.SQSMessageException;
+import com.bazaarvoice.emodb.web.resources.blob.messageQueue.SQSServiceFactory;
import com.bazaarvoice.emodb.web.resources.sor.AuditParam;
import com.bazaarvoice.emodb.web.resources.sor.TableOptionsParam;
import com.codahale.metrics.Meter;
@@ -37,44 +36,24 @@
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.compress.utils.IOUtils;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.coursera.metrics.datadog.TaggedName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.HeaderParam;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
-import javax.ws.rs.core.UriInfo;
+import javax.ws.rs.*;
+import javax.ws.rs.core.*;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Spliterators;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
-
import static java.lang.String.format;
@Path("/blob/1")
@@ -84,6 +63,8 @@
public class BlobStoreResource1 {
private static final Logger _log = LoggerFactory.getLogger(BlobStoreResource1.class);
+ private final MessagingService _messagingService;
+
private static final String X_BV_PREFIX = "X-BV-"; // HTTP header prefix for BlobMetadata other than attributes
private static final String X_BVA_PREFIX = "X-BVA-"; // HTTP header prefix for BlobMetadata attributes
private static final Pattern CONTENT_ENCODING = Pattern.compile("content[-_]?encoding", Pattern.CASE_INSENSITIVE);
@@ -110,11 +91,11 @@ public class BlobStoreResource1 {
private final LoadingCache _putObjectRequestsByApiKey;
private final LoadingCache _deleteObjectRequestsByApiKey;
+
public BlobStoreResource1(BlobStore blobStore, Set approvedContentTypes, MetricRegistry metricRegistry) {
_blobStore = blobStore;
_approvedContentTypes = approvedContentTypes;
_metricRegistry = metricRegistry;
-
_listTableRequestsByApiKey = createMetricCache("listTablesByApiKey");
_createTableRequestsByApiKey = createMetricCache("createTableByApiKey");
_dropTableRequestsByApiKey = createMetricCache("dropTableByApiKey");
@@ -130,7 +111,7 @@ public BlobStoreResource1(BlobStore blobStore, Set approvedContentTypes,
_getObjectRequestsByApiKey = createMetricCache("getByApiKey");
_putObjectRequestsByApiKey = createMetricCache("putByApiKey");
_deleteObjectRequestsByApiKey = createMetricCache("deleteByApiKey");
-
+ _messagingService= new SQSServiceFactory().createSQSService();
}
private LoadingCache createMetricCache(String metricName) {
@@ -160,10 +141,10 @@ public Iterator listTables(@QueryParam("from") final String fromKeyExclus
@Authenticated Subject subject) {
_listTableRequestsByApiKey.getUnchecked(subject.getId()).mark();
return streamingIterator(
- StreamSupport.stream(Spliterators.spliteratorUnknownSize(_blobStore.listTables(Strings.emptyToNull(fromKeyExclusive), Long.MAX_VALUE), 0), false)
- .filter(input -> subject.hasPermission(Permissions.readBlobTable(new NamedResource(input.getName()))))
- .limit(limit.get())
- .iterator()
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(_blobStore.listTables(Strings.emptyToNull(fromKeyExclusive), Long.MAX_VALUE), 0), false)
+ .filter(input -> subject.hasPermission(Permissions.readBlobTable(new NamedResource(input.getName()))))
+ .limit(limit.get())
+ .iterator()
);
}
@@ -191,8 +172,16 @@ public SuccessResponse createTable(@PathParam("table") String table,
if (!subject.hasPermission(Permissions.createBlobTable(resource))) {
throw new UnauthorizedException();
}
-
_blobStore.createTable(table, options, attributes, audit);
+ try {
+ _messagingService.sendCreateTableSQS(table,options,attributes,audit);
+ } catch (IOException | AmazonClientException e) {
+ _log.error("Failed to send create table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send create table message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending create table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending create table message to SQS", e);
+ }
return SuccessResponse.instance();
}
@@ -211,6 +200,15 @@ public SuccessResponse dropTable(@PathParam("table") String table,
_dropTableRequestsByApiKey.getUnchecked(subject.getId()).mark();
Audit audit = getRequired(auditParam, "audit");
_blobStore.dropTable(table, audit);
+ try {
+ _messagingService.sendDeleteTableSQS(table, audit);
+ } catch (IOException | AmazonClientException e) {
+ _log.error("Failed to send delete table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send delete table message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending delete table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending delete table message to SQS", e);
+ }
return SuccessResponse.instance();
}
@@ -228,6 +226,15 @@ public SuccessResponse purgeTable(@PathParam("table") String table,
_purgeTableRequestsByApiKey.getUnchecked(subject.getId()).mark();
Audit audit = getRequired(auditParam, "audit");
_blobStore.purgeTableUnsafe(table, audit);
+ try {
+ _messagingService.purgeTableSQS(table,audit);
+ } catch (IOException | AmazonClientException| UnsupportedOperationException e) {
+ _log.error("Failed to send purge table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send purge table message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending purge table message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending purge table message to SQS", e);
+ }
return SuccessResponse.instance();
}
@@ -262,6 +269,18 @@ public SuccessResponse setTableAttributes(@PathParam("table") String table,
_setTableAttributesRequestsByApiKey.getUnchecked(subject.getId()).mark();
Audit audit = getRequired(auditParam, "audit");
_blobStore.setTableAttributes(table, attributes, audit);
+ try {
+ //send table attributes to sqs queue
+ _messagingService.putTableAttributesSQS(table,attributes,audit);
+
+ } catch (IOException | AmazonClientException e) {
+ _log.error("Failed to send put table attributes message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send put table attributes message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending put table attributes message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending put table attributes message to SQS", e);
+ }
+
return SuccessResponse.instance();
}
@@ -362,8 +381,6 @@ public Collection getTablePlacements(@Authenticated Subject subject) {
_getTablePlacementsRequestsByApiKey.getUnchecked(subject.getId()).mark();
return _blobStore.getTablePlacements();
}
-
-
/**
* Retrieves the current version of a piece of content from the data store.
*/
@@ -467,6 +484,7 @@ public SuccessResponse put(@PathParam("table") String table,
InputStream in,
@QueryParam("ttl") SecondsParam ttlParam,
@Context HttpHeaders headers,
+ @Context UriInfo uriInfo,
@Authenticated Subject subject)
throws IOException {
_putObjectRequestsByApiKey.getUnchecked(subject.getId()).mark();
@@ -492,8 +510,25 @@ public SuccessResponse put(@PathParam("table") String table,
throw new IllegalArgumentException(String.format("Ttl:%s is specified for blobId:%s", ttl, blobId));
}
+ byte[] byteArray = IOUtils.toByteArray(in);
+
+ String requestUrl= uriInfo.getRequestUri().toString();
+
// Perform the put
- _blobStore.put(table, blobId, onceOnlySupplier(in), attributes);
+ InputStream inputStream = new ByteArrayInputStream(byteArray);
+ _blobStore.put(table, blobId, onceOnlySupplier(inputStream), attributes);
+
+ // Send the buffer bytes to SQS
+ try {
+ _messagingService.sendPutRequestSQS(table, blobId, attributes, requestUrl);
+ } catch (IOException | AmazonClientException e) {
+ _log.error("Failed to send put blob message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send put blob message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending put blob message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending put blob message to SQS", e);
+ }
+
return SuccessResponse.instance();
}
@@ -510,6 +545,15 @@ public SuccessResponse delete(@PathParam("table") String table,
@PathParam("blobId") String blobId,
@Authenticated Subject subject) {
_deleteObjectRequestsByApiKey.getUnchecked(subject.getId()).mark();
+ try {
+ _messagingService.sendDeleteRequestSQS(table, blobId);
+ } catch (IOException | AmazonClientException e) {
+ _log.error("Failed to send delete blob message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Failed to send delete blob message to SQS", e);
+ } catch (RuntimeException e) {
+ _log.error("Unexpected error occurred while sending delete blob message to SQS for table {}: {}", table, e.getMessage());
+ throw new SQSMessageException("Unexpected error occurred while sending delete blob message to SQS", e);
+ }
_blobStore.delete(table, blobId);
return SuccessResponse.instance();
}
@@ -554,4 +598,4 @@ private Supplier onceOnlySupplier(final InputStream in) {
return in;
};
}
-}
+}
\ No newline at end of file
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java
new file mode 100644
index 0000000000..d6caaded9a
--- /dev/null
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java
@@ -0,0 +1,21 @@
+package com.bazaarvoice.emodb.web.resources.blob.messageQueue;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.bazaarvoice.emodb.sor.api.Audit;
+import com.bazaarvoice.emodb.sor.api.TableOptions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Interface for interacting with a messaging service.
+ */
+public interface MessagingService {
+ void sendPutRequestSQS(String table, String blobId, Map attributes, String requestUrl) throws IOException;
+ void sendDeleteRequestSQS(String table, String blobId) throws IOException;
+ void sendCreateTableSQS(String table, TableOptions options, Map attributes, Audit audit) throws JsonProcessingException;
+ void sendDeleteTableSQS(String table, Audit audit) throws IOException;
+ void purgeTableSQS(String table, Audit audit) throws IOException;
+ void putTableAttributesSQS(String table, Map attributes, Audit audit) throws JsonProcessingException;
+}
+
+
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java
new file mode 100644
index 0000000000..edc35e6acc
--- /dev/null
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java
@@ -0,0 +1,7 @@
+package com.bazaarvoice.emodb.web.resources.blob.messageQueue;
+
+public class SQSMessageException extends RuntimeException {
+ public SQSMessageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java
new file mode 100644
index 0000000000..08487fdaee
--- /dev/null
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java
@@ -0,0 +1,136 @@
+package com.bazaarvoice.emodb.web.resources.blob.messageQueue;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import com.amazonaws.services.sqs.model.SendMessageResult;
+import com.bazaarvoice.emodb.sor.api.Audit;
+import com.bazaarvoice.emodb.sor.api.TableOptions;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.DatatypeConverter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Service class for interacting with Amazon SQS (Simple Queue Service).
+ */
+public class SQSService implements MessagingService {
+ private static final Logger _log = LoggerFactory.getLogger(SQSService.class);
+
+ private final AmazonSQS sqs;
+ private final String queueUrl;
+ private final ObjectMapper objectMapper;
+
+ /**
+ * Constructor for SQSService.
+ *
+ * @param queueName The name of the SQS queue to send messages to.
+ * @param objectMapper ObjectMapper for converting messages to JSON format.
+ * @param sqs AmazonSQS for sending messages
+ */
+ public SQSService(String queueName, ObjectMapper objectMapper, AmazonSQS sqs) {
+ this.objectMapper = objectMapper;
+ this.sqs = sqs;
+ this.queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
+ }
+
+ @Override
+ public void sendPutRequestSQS(String table, String blobId, Map attributes, String requestUrl) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "PUT_TABLE_BLOBID");
+ messageMap.put("tenantName", "datastorage");
+ messageMap.put("requestUrl", requestUrl);
+ messageMap.put("table", table);
+ messageMap.put("blobId", blobId);
+ messageMap.put("attributes", attributes);
+
+ // Logging the length of the byte array
+ // _log.debug("Byte array length: {}", byteArray.length);
+
+ // Convert byte array to base64 string
+ // String base64Data = DatatypeConverter.printBase64Binary(byteArray);
+ // messageMap.put("data", base64Data);
+ _log.debug("Sending PUT request to SQS. Table: {}, BlobId: {}, RequestUrl: {}", table, blobId, requestUrl);
+ sendMessageSQS(messageMap);
+ }
+
+ @Override
+ public void sendDeleteRequestSQS(String table, String blobId) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "DELETE_BLOB");
+ messageMap.put("table", table);
+ messageMap.put("blobId", blobId);
+ sendMessageSQS(messageMap);
+ }
+
+ @Override
+ public void sendCreateTableSQS(String table, TableOptions options, Map attributes, Audit audit) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "CREATE_TABLE");
+ messageMap.put("table", table);
+ messageMap.put("options", options);
+ messageMap.put("attributes", attributes);
+ messageMap.put("audit", audit);
+ sendMessageSQS(messageMap);
+ }
+
+ @Override
+ public void sendDeleteTableSQS(String table, Audit audit) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "DELETE_TABLE");
+ messageMap.put("table", table);
+ messageMap.put("audit", audit);
+ sendMessageSQS(messageMap);
+ }
+
+ @Override
+ public void purgeTableSQS(String table, Audit audit) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "PURGE_TABLE");
+ messageMap.put("table", table);
+ messageMap.put("audit", audit);
+ sendMessageSQS(messageMap);
+ }
+
+ @Override
+ public void putTableAttributesSQS(String table, Map attributes, Audit audit) {
+ Map messageMap = new HashMap<>();
+ messageMap.put("method", "SET_TABLE_ATTRIBUTE");
+ messageMap.put("table", table);
+ messageMap.put("attributes", attributes);
+ messageMap.put("audit", audit);
+ sendMessageSQS(messageMap);
+ }
+
+ private void sendMessageSQS(Map messageMap) {
+ try {
+ String messageBody = objectMapper.writeValueAsString(messageMap);
+ String messageGroupId = "blob";
+ SendMessageRequest sendMessageRequest = new SendMessageRequest()
+ .withQueueUrl(queueUrl)
+ .withMessageBody(messageBody)
+ .withMessageGroupId(messageGroupId);
+ SendMessageResult result = sqs.sendMessage(sendMessageRequest);
+ _log.info("Message sent successfully to SQS. Message ID: {}", result.getMessageId());
+ } catch (JsonProcessingException e) {
+ _log.error("Error converting message to JSON: {}", e.getMessage());
+ throw new SQSMessageException("Failed to convert message to JSON", e);
+ } catch (AmazonServiceException e) {
+ _log.error("AmazonServiceException: {}", e.getMessage());
+ throw new SQSMessageException("AWS service error occurred while sending message to SQS", e);
+ } catch (AmazonClientException e) {
+ _log.error("AmazonClientException: {}", e.getMessage());
+ throw new SQSMessageException("Client error occurred while sending message to SQS", e);
+ } catch (Exception e) {
+ _log.error("Unexpected error occurred: {}", e.getMessage(), e);
+ throw new SQSMessageException("Unexpected error occurred while sending message to SQS", e);
+ }
+ }
+}
+
+
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java
new file mode 100644
index 0000000000..9638a8bb1d
--- /dev/null
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java
@@ -0,0 +1,12 @@
+package com.bazaarvoice.emodb.web.resources.blob.messageQueue;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SQSServiceFactory {
+ public MessagingService createSQSService() {
+ return new SQSService("blobMigrationQueue.fifo", new ObjectMapper(), AmazonSQSClientBuilder.standard().build());
+ }
+}
+
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
index 8e9f9b2e21..7c73fd693d 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/AbstractSubjectDatabus.java
@@ -61,6 +61,11 @@ public long getEventCountUpTo(Subject subject, @PartitionKey String subscription
return databus(subject).getEventCountUpTo(subscription, limit);
}
+ @Override
+ public long getMasterCount(Subject subject){
+ return databus(subject).getMasterCount();
+ }
+
@Override
public long getClaimCount(Subject subject, @PartitionKey String subscription) {
return databus(subject).getClaimCount(subscription);
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
index ca356d0ac9..91e23b90e9 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/DatabusResource1.java
@@ -173,6 +173,18 @@ public long getEventCount(@QueryParam ("partitioned") BooleanParam partitioned,
}
}
+ @GET
+ @Path ("{channel}/uncached_size")
+ @Timed (name = "bv.emodb.databus.DatabusResource1.getMasterEventCount", absolute = true)
+ @ApiOperation (value = "Gets the master event count.",
+ notes = "Returns a long.",
+ response = long.class
+ )
+ public long getEventCountInMaster(@QueryParam ("partitioned") BooleanParam partitioned,
+ @Authenticated Subject subject) {
+ return getClient(partitioned).getMasterCount(subject);
+ }
+
@GET
@Path ("{subscription}/claimcount")
@RequiresPermissions ("databus|get_status|{subscription}")
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
index 44493228e9..bcac1c3e8b 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/databus/SubjectDatabus.java
@@ -52,6 +52,8 @@ Subscription getSubscription(Subject subject, String subscription)
long getEventCountUpTo(Subject subject, String subscription, long limit);
+ long getMasterCount(Subject subject);
+
long getClaimCount(Subject subject, String subscription);
Iterator peek(Subject subject, String subscription, int limit);
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
index 0f24879873..88e0a04dbc 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
@@ -10,6 +10,8 @@
import com.bazaarvoice.emodb.web.auth.resource.NamedResource;
import com.bazaarvoice.emodb.web.jersey.params.SecondsParam;
import com.bazaarvoice.emodb.web.resources.SuccessResponse;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@@ -48,10 +50,23 @@ public class DedupQueueResource1 {
private final DedupQueueService _queueService;
private final DedupQueueServiceAuthenticator _queueClient;
+ private final Meter _nullPollDedupCount;
+ private final Meter _messageDedupCount;
+ private final Meter _sendDedupCount;
+ private final Meter _sendNullDedupCount;
+ private final Meter _sendBatchNullDedupCount;
+ private final Meter _sendBatchDedupCount;
- public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuthenticator queueClient) {
+
+ public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuthenticator queueClient, MetricRegistry metricRegistry) {
_queueService = requireNonNull(queueService, "queueService");
_queueClient = requireNonNull(queueClient, "queueClient");
+ _nullPollDedupCount = metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class, "nullPollsDedupCount"));
+ _messageDedupCount = metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class, "polledMessageDedupCount"));
+ _sendDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendDedupCount"));
+ _sendNullDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendNullDedupCount"));
+ _sendBatchDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendBatchDedupCount"));
+ _sendBatchNullDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendBatchNullDedupCount"));
}
@POST
@@ -65,6 +80,12 @@ public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuth
)
public SuccessResponse send(@PathParam("queue") String queue, Object message) {
// Not partitioned--any server can write messages to Cassandra.
+ if (message == null) {
+ _sendNullDedupCount.mark();
+ }
+ else{
+ _sendDedupCount.mark();
+ }
_queueService.send(queue, message);
return SuccessResponse.instance();
}
@@ -80,9 +101,29 @@ public SuccessResponse send(@PathParam("queue") String queue, Object message) {
)
public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection