load(SizeCacheKey key)
return Maps.immutableEntry(internalMessageCountUpTo(key.channelName, key.limitAsked), key.limitAsked);
}
});
- _sendAllMeterAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllAQS"));
- _sendAllMeterNullAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllNullAQS"));
- _pollAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollAQS"));
- _pollNullAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollNullAQS"));
-
}
private void registerMoveQueueJobHandler(JobHandlerRegistry jobHandlerRegistry) {
@@ -109,42 +129,148 @@ public MoveQueueResult run(MoveQueueRequest request)
});
}
+ /**
+ * Retrieves the value of the "isExperiment" flag from the cache if available.
+ * If the value is not present in the cache or the cache has expired, it fetches the value
+ * from AWS Parameter Store and stores it in the cache.
+ *
+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed
+ * from the Parameter Store on the next access.
+ *
+ *
+ * @return {@code true} if the experiment is still running, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store.
+ */
+ private boolean getIsExperimentValue() {
+ try {
+ // Attempt to retrieve from cache
+ return experimentCache.get(IS_EXPERIMENT, () -> {
+
+ Boolean checkExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment"));
+ _log.info("IS_EXPERIMENT is refreshed {}", checkExperiment);
+ // If absent or expired, fetch from Parameter Store and cache the result
+ return checkExperiment;
+ });
+ } catch (Exception e) {
+ // Handle any errors that might occur while accessing the cache or Parameter Store
+ throw new RuntimeException("Error fetching experiment flag", e);
+ }
+ }
+
@Override
public void send(String queue, Object message) {
- sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ //boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment"));
+ boolean isExperiment = getIsExperimentValue();
+ if (!isExperiment) {
+ // Experiment is over now, send everything to Kafka
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ } else {
+ List allowedQueues = fetchAllowedQueues();
+ // Experiment is still running, check if the queue is allowed
+ if (allowedQueues.contains(queue)) {
+ // Send to Kafka, only if it's an allowed queue
+ sendAll(Collections.singletonMap(queue, Collections.singleton(message)));
+ } else {
+ // Send to Cassandra (rollback plan)
+ sendAll(queue, Collections.singleton(message), false);
+ }
+ }
}
@Override
public void sendAll(String queue, Collection> messages) {
- sendAll(Collections.singletonMap(queue, messages));
+ List allowedQueues = fetchAllowedQueues();
+ boolean isExperiment = getIsExperimentValue();
+ if (!isExperiment) {
+ // experiment is over now, send everything to kafka
+ sendAll(Collections.singletonMap(queue, messages));
+ } else {
+ // Experiment is still running, check if the queue is allowed
+ if(allowedQueues.contains(queue)){
+ //send kafka , only if its allowed queue
+ sendAll(Collections.singletonMap(queue, messages));
+ }
+ else {
+ //send to cassandra, (rollback plan)
+ sendAll(queue, messages, false);
+ }
+ }
+ }
+
+
+ private void validateMessage(Object message) {
+ // Check if the message is valid using JsonValidator
+ ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
+
+ // Check if the message size exceeds the allowed limit
+ checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES,
+ "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+
+ }
+
+ private void validateQueue(String queue, Collection> messages) {
+ requireNonNull(queue, "Queue name cannot be null");
+ requireNonNull(messages, "Messages collection cannot be null");
+
+ // Check if the queue name is legal
+ checkLegalQueueName(queue);
}
@Override
public void sendAll(Map> messagesByQueue) {
requireNonNull(messagesByQueue, "messagesByQueue");
- if(messagesByQueue.keySet().isEmpty()){
- _sendAllMeterNullAQS.mark();
- } else {
- _sendAllMeterAQS.mark(messagesByQueue.keySet().size());
- }
- ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
for (Map.Entry> entry : messagesByQueue.entrySet()) {
String queue = entry.getKey();
Collection> messages = entry.getValue();
- checkLegalQueueName(queue);
- requireNonNull(messages, "messages");
+ validateQueue(queue, messages);
- List events = Lists.newArrayListWithCapacity(messages.size());
- for (Object message : messages) {
- ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
- checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+ List events = Lists.newArrayListWithCapacity(messages.size());
- events.add(messageByteBuffer);
+ // Validate each message
+ for (Object message : messages) {
+ validateMessage(message);
+ events.add(message.toString());
}
builder.putAll(queue, events);
}
+
+ Multimap eventsByChannel = builder.build();
+
+ String queueType = determineQueueType();
+ for (Map.Entry> topicEntry : eventsByChannel.asMap().entrySet()) {
+ String queueName= topicEntry.getKey();
+ String topic = "dsq-" + (("dedupq".equals(queueType)) ? "dedup-" + queueName : queueName);
+ // Check if the topic exists, if not create it and execute Step Function
+ if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) {
+ Map parameters = fetchStepFunctionParameters();
+ // Execute Step Function after topic creation
+ startStepFunctionExecution(parameters, queueType,queueName, topic);
+ }
+ producerService.sendMessages(topic, topicEntry.getValue(), queueType);
+ _log.info("Messages sent to topic: {}", topic);
+ }
+ }
+
+ @Override
+ public void sendAll(String queue, Collection> messages, boolean fromKafka) {
+ //incoming message from kafka consume, save to cassandra
+ if(!fromKafka){
+ validateQueue(queue, messages);
+ }
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+ List events = Lists.newArrayListWithCapacity(messages.size());
+
+
+ for (Object message : messages) {
+ ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message));
+ checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES,
+ "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
+ events.add(messageByteBuffer);
+ }
+ builder.putAll(queue, events);
Multimap eventsByChannel = builder.build();
_eventStore.addAll(eventsByChannel);
@@ -155,6 +281,11 @@ public long getMessageCount(String queue) {
return getMessageCountUpTo(queue, Long.MAX_VALUE);
}
+ @Override
+ public long getUncachedSize(String queue){
+ return internalMessageCountUpTo(queue, Long.MAX_VALUE);
+ }
+
@Override
public long getMessageCountUpTo(String queue, long limit) {
// We get the size from cache as a tuple of size, and the limit used to estimate that size
@@ -196,14 +327,8 @@ public List poll(String queue, Duration claimTtl, int limit) {
checkLegalQueueName(queue);
checkArgument(claimTtl.toMillis() >= 0, "ClaimTtl must be >=0");
checkArgument(limit > 0, "Limit must be >0");
- List response = toMessages(_eventStore.poll(queue, claimTtl, limit));
- if(response.isEmpty()){
- _pollNullAQS.mark();
- }
- else{
- _pollAQS.mark(response.size());
- }
- return response;
+
+ return toMessages(_eventStore.poll(queue, claimTtl, limit));
}
@Override
@@ -219,7 +344,6 @@ public void renew(String queue, Collection messageIds, Duration claimTtl
public void acknowledge(String queue, Collection messageIds) {
checkLegalQueueName(queue);
requireNonNull(messageIds, "messageIds");
-
_eventStore.delete(queue, messageIds, true);
}
@@ -298,4 +422,104 @@ private void checkLegalQueueName(String queue) {
"Allowed punctuation characters are -.:@_ and the queue name may not start with a single underscore character. " +
"An example of a valid table name would be 'polloi:provision'.");
}
-}
+ /**
+ * Fetches the necessary Step Function parameters from AWS Parameter Store.
+ */
+ private Map fetchStepFunctionParameters() {
+ List parameterNames = Arrays.asList(
+ "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn",
+ "/"+ UNIVERSE+ "/emodb/stepfn/queueThreshold",
+ "/"+ UNIVERSE+ "/emodb/stepfn/batchSize",
+ "/"+ UNIVERSE+ "/emodb/stepfn/interval"
+ );
+
+ try {
+ return parameterStoreUtil.getParameters(parameterNames);
+ } catch (Exception e) {
+ _log.error("Failed to fetch Step Function parameters from Parameter Store", e);
+ throw new RuntimeException("Error fetching Step Function parameters", e);
+ }
+ }
+
+ /**
+ * Executes the Step Function for a given topic after it has been created.
+ */
+
+ private void startStepFunctionExecution(Map parameters, String queueType, String queueName, String topic) {
+ try {
+ String stateMachineArn = parameters.get( "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn");
+ int queueThreshold = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/queueThreshold"));
+ int batchSize = Integer.parseInt(parameters.get ("/"+ UNIVERSE+ "/emodb/stepfn/batchSize"));
+ int interval = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/interval"));
+
+ String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval);
+
+ // Create the timestamp
+ String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds
+
+ // Check if queueType is "dedupq" and prepend "D" to execution name if true
+ String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;
+
+ // Start the Step Function execution
+ stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
+
+ _log.info("Step Function executed for topic: {} with executionName: {}", topic, executionName);
+ } catch (Exception e) {
+ _log.error("Error executing Step Function for topic: {}", topic, e);
+ throw new RuntimeException("Error executing Step Function for topic: " + topic, e);
+ }
+ }
+
+ /**
+ * Determines the queue type based on the event store.
+ */
+ private String determineQueueType() {
+ if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) {
+ return "dedupq";
+ }
+ return "queue";
+ }
+
+ private List fetchAllowedQueues() {
+ try {
+ return new ArrayList<>(allowedQueuesCache.get(ALLOWED_QUEUES, this::fetchAllowedQueuesFromParamStore));
+ } catch (Exception e) {
+ // Handle the case when the parameter is not found or fetching fails
+ _log.error("Error fetching allowedQueues in fetchAllowedQueues: " + e.getMessage());
+ return Collections.singletonList(""); // Default to an empty list if the parameter is missing
+ }
+
+ }
+
+ private Set fetchAllowedQueuesFromParamStore() {
+ try {
+ // Fetch the 'allowedQueues' parameter using ParameterStoreUtil
+ String allowedQueuesStr = parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/allowedQueues");
+ _log.info("ALLOWED_QUEUES is refreshed");
+ return new HashSet<>(Arrays.asList(allowedQueuesStr.split(",")));
+ } catch (Exception e) {
+ // Handle the case when the parameter is not found or fetching fails
+ _log.error("Error fetching allowedQueues: " + e.getMessage());
+ return new HashSet<>(Collections.singletonList("")); // Default to an empty list if the parameter is missing
+ }
+ }
+
+ private String createInputPayload(int queueThreshold, int batchSize, String queueType, String queueName, String topicName, int interval) {
+ Map payloadData = new HashMap<>();
+ payloadData.put("queueThreshold", queueThreshold);
+ payloadData.put("batchSize", batchSize);
+ payloadData.put("queueType", queueType);
+ payloadData.put("queueName", queueName);
+ payloadData.put("topicName", topicName);
+ payloadData.put("interval", interval);
+ Map wrappedData = new HashMap<>();
+ wrappedData.put("executionInput", payloadData); // Wrap the data
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.writeValueAsString(wrappedData); // Convert wrapped data to JSON
+ } catch (JsonProcessingException e) {
+ _log.error("Error while converting map to JSON", e);
+ return "{}"; // Return empty JSON object on error
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
index 0955166218..9ec8d36606 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java
@@ -4,7 +4,9 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.queue.api.DedupQueueService;
-import com.codahale.metrics.MetricRegistry;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import java.time.Clock;
@@ -12,7 +14,7 @@
public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService {
@Inject
public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock, MetricRegistry metricRegistry) {
- super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock, metricRegistry);
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService );
}
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
index 947572208d..524ca50033 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java
@@ -4,7 +4,9 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.queue.api.QueueService;
-import com.codahale.metrics.MetricRegistry;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import java.time.Clock;
@@ -12,7 +14,7 @@
public class DefaultQueueService extends AbstractQueueService implements QueueService {
@Inject
public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
- Clock clock, MetricRegistry metricRegistry) {
- super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock, metricRegistry);
+ Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
+ super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService);
}
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
index 47e24ccf38..b349b19298 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java
@@ -91,6 +91,11 @@ public void purge(String apiKey, String queue) {
_dedupQueueService.purge(queue);
}
+ @Override
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+ _dedupQueueService.sendAll(queue, messages, isFlush);
+ }
+
@Override
public void sendAll(String apiKey, Map> messagesByQueue) {
_dedupQueueService.sendAll(messagesByQueue);
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
index 5ceea10a8e..cdafc8935e 100644
--- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java
@@ -95,4 +95,9 @@ public void purge(String apiKey, String queue) {
public void sendAll(String apiKey, Map> messagesByQueue) {
_queueService.sendAll(messagesByQueue);
}
+
+ @Override
+ public void sendAll(String apiKey, String queue, Collection> messages, boolean isFlush) {
+
+ }
}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java
new file mode 100644
index 0000000000..9621617ba3
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java
@@ -0,0 +1,120 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import java.util.Collections;
+
+public class KafkaAdminService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class);
+ private final AdminClient adminClient;
+ // Cache for the list of all topics with a TTL of 10 minutes
+ private final Cache> topicListCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .build();
+
+ private static final String TOPIC_LIST_KEY = "allTopics";
+
+
+ public KafkaAdminService() {
+ this.adminClient = AdminClient.create(KafkaConfig.getAdminProps());
+ }
+
+ /**
+ * Creates a new Kafka topic with the specified configurations.
+ *
+ * @param topic The name of the topic.
+ * @param numPartitions Number of partitions.
+ * @param replicationFactor Replication factor.
+ */
+ public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) {
+ Boolean isExisting =isTopicExists(topic);
+ if (! isExisting) {
+ //create the topic now
+ NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
+ try {
+ adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ addToCache(topic);
+ _log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor);
+ } catch (Exception e) {
+ _log.error("Error creating topic {}: ", topic, e);
+ throw new RuntimeException(e);
+ }
+ }
+ return isExisting;
+ }
+ public void addToCache(String topic){
+ Set topics = topicListCache.getIfPresent(TOPIC_LIST_KEY);
+ if (topics == null) {
+ topics = new HashSet<>();
+ } else {
+ // Create a new mutable Set if the existing one is unmodifiable
+ topics = new HashSet<>(topics);
+ }
+ topics.add(topic);
+ topicListCache.put(TOPIC_LIST_KEY, topics);
+ _log.info("Added newly created topic to cache: {}", topic);
+ }
+
+
+ /**
+ * Checks if a Kafka topic exists by using a cache to store the list of all topics.
+ * If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list.
+ *
+ * The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed
+ * from Kafka on the next access.
+ *
+ *
+ * @param topic the name of the Kafka topic to check
+ * @return {@code true} if the topic exists, otherwise {@code false}.
+ * @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists.
+ */
+ public boolean isTopicExists(String topic) {
+ try {
+ // Retrieve the list of topics from the cache
+ Set topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka);
+
+ // Check if the given topic is in the cached list
+ return topics.contains(topic);
+ } catch (ExecutionException e) {
+ _log.error("Failed to check if topic exists: {}", topic, e);
+ throw new RuntimeException("Error checking if topic exists", e);
+ }
+ }
+
+ /**
+ * Fetches the list of all topic names from Kafka AdminClient.
+ * This method is called only when the cache is expired or empty.
+ *
+ * @return a Set containing all topic names.
+ * @throws ExecutionException if there is an error fetching the topic list from Kafka.
+ */
+ private Set fetchTopicListFromKafka() throws ExecutionException {
+ try {
+ _log.info("Fetching topic list from Kafka");
+ return adminClient.listTopics().names().get();
+ } catch (Exception e) {
+ _log.error("Error fetching topic list from Kafka", e);
+ throw new ExecutionException(e);
+ }
+ }
+
+ /**
+ * Closes the AdminClient to release resources.
+ */
+ public void close() {
+ if (adminClient != null) {
+ adminClient.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java
new file mode 100644
index 0000000000..13011e8bcc
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java
@@ -0,0 +1,157 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.Parameter;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class KafkaConfig {
+ private static String bootstrapServersConfig;
+ private static String batchSizeConfig;
+ private static String retriesConfig;
+ private static String lingerMsConfig;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+ // Static SSM Client and configuration using AWS SDK v1
+ private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder
+ .standard()
+ .build();
+
+
+ static {
+ try {
+ final String UNIVERSE = getUniverseFromEnv();
+ // Load configurations from SSM during static initialization
+ Map parameterValues = getParameterValues(
+ Arrays.asList(
+ "/" + UNIVERSE + "/emodb/kafka/batchSize",
+ "/" + UNIVERSE + "/emodb/kafka/retries",
+ "/" + UNIVERSE + "/emodb/kafka/lingerMs",
+ "/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
+ )
+ );
+
+ // Set configurations with fallback to defaults if not present
+ // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
+ batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
+
+ // Sets the number of retry attempts for failed Kafka message sends.
+ retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
+
+ // Sets the number of milliseconds a producer is willing to wait before sending a batch out
+ lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
+
+ // Configures the Kafka broker addresses for producer connections.
+ bootstrapServersConfig = parameterValues.get("/" + UNIVERSE + "/emodb/kafka/bootstrapServers");
+
+ logger.info("Kafka configurations loaded successfully from SSM.");
+ } catch (AmazonServiceException e) {
+ logger.error("Failed to load configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ catch (Exception e) {
+ logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
+ throw e;
+ }
+ }
+
+ public static String getUniverseFromEnv() {
+ String filePath = "/etc/environment";
+ logger.info("Reading environment file: " + filePath);
+ Properties environmentProps = new Properties();
+
+ try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Skip empty lines or comments
+ if (line.trim().isEmpty() || line.trim().startsWith("#")) {
+ continue;
+ }
+ // Split the line into key-value pair
+ String[] parts = line.split("=", 2);
+ logger.info("parts: " + Arrays.toString(parts));
+ if (parts.length == 2) {
+ String key = parts[0].trim();
+ String value = parts[1].trim();
+ // Remove any surrounding quotes from value
+ value = value.replace("\"", "");
+ environmentProps.put(key, value);
+ }
+ }
+ // Access the environment variables
+ return environmentProps.getProperty("UNIVERSE");
+ } catch (IOException e) {
+ logger.error("Error reading environment file: " + e.getMessage());
+ throw new RuntimeException("Error reading environment file: " + e.getMessage());
+ }
+ }
+ // Fetch parameters from AWS SSM using AWS SDK v1
+ private static Map getParameterValues(List parameterNames) {
+ try {
+ GetParametersRequest request = new GetParametersRequest()
+ .withNames(parameterNames)
+ .withWithDecryption(true);
+
+ GetParametersResult response = ssmClient.getParameters(request);
+
+ return response.getParameters().stream()
+ .collect(Collectors.toMap(Parameter::getName, Parameter::getValue));
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from SSM.", e);
+ throw e; // Rethrow or handle the exception if necessary
+ }
+ }
+
+ // Kafka Producer properties
+ public static Properties getProducerProps() {
+ Properties producerProps = new Properties();
+
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig));
+ producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig));
+ producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig));
+ producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting
+ logger.info("Kafka Producer properties initialized.");
+ return producerProps;
+ }
+
+ // Kafka Admin properties
+ public static Properties getAdminProps() {
+ Properties adminProps = new Properties();
+
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
+ logger.info("Kafka Admin properties initialized.");
+ return adminProps;
+ }
+
+ // Ensure the SSM client is closed when the application shuts down
+ public static void shutdown() {
+ if (ssmClient != null) {
+ try {
+ ssmClient.shutdown();
+ logger.info("SSM client closed successfully.");
+ } catch (Exception e) {
+ logger.error("Error while closing SSM client.", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java
new file mode 100644
index 0000000000..1a7cc72eee
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java
@@ -0,0 +1,66 @@
+package com.bazaarvoice.emodb.queue.core.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Future;
+
+public class KafkaProducerService {
+ private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class);
+ private final KafkaProducer producer; // Changed to String
+
+ public KafkaProducerService() {
+ this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps());
+ _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps());
+ }
+
+ /**
+ * Sends each message from the collection to the specified Kafka topic separately.
+ *
+ * @param topic The Kafka topic.
+ * @param events The collection of messages to be sent.
+ */
+ public void sendMessages(String topic, Collection events, String queueType) {
+ LocalDateTime startTime = LocalDateTime.now();
+ _log.info("Sending {} messages to topic '{}'", events.size(), topic);
+ List> futures = new ArrayList<>();
+ // Use async sendMessage and collect futures
+ for (String event : events) {
+ futures.add(producer.send(new ProducerRecord<>(topic, event)));
+ }
+
+ // Wait for all futures to complete
+ for (Future future : futures) {
+ try {
+ future.get(); // Only blocks if a future is not yet complete
+ } catch (Exception e) {
+ _log.error("Error while sending message to Kafka: {}", e.getMessage());
+ throw new RuntimeException("Error sending messages to Kafka", e);
+ }
+ }
+ _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis());
+ }
+
+
+ /**
+ * Closes the producer to release resources.
+ */
+ public void close() {
+ _log.info("Closing Kafka producer.");
+ try {
+ producer.flush();
+ producer.close();
+ } catch (Exception e) {
+ _log.error("Error while closing Kafka producer: ", e);
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java
new file mode 100644
index 0000000000..d2d65c144f
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java
@@ -0,0 +1,107 @@
+package com.bazaarvoice.emodb.queue.core.ssm;
+
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement;
+import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest;
+import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult;
+import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException;
+import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for interacting with AWS Parameter Store using AWS SDK v1.
+ */
+public class ParameterStoreUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class);
+ private final AWSSimpleSystemsManagement ssmClient;
+
+ /**
+ * Constructor to initialize the SSM client
+ */
+ public ParameterStoreUtil() {
+ // Create SSM client with default credentials and region
+ ssmClient = AWSSimpleSystemsManagementClientBuilder.standard()
+ .build();
+ }
+
+ /**
+ * Fetches a parameter from AWS Parameter Store.
+ *
+ * @param parameterName The name of the parameter to fetch
+ * @return The value of the parameter
+ * @throws IllegalArgumentException If the parameterName is null or empty
+ */
+ public String getParameter(String parameterName) {
+ if (parameterName == null || parameterName.isEmpty()) {
+ logger.error("Parameter name cannot be null or empty");
+ throw new IllegalArgumentException("Parameter name cannot be null or empty");
+ }
+
+ try {
+
+ GetParameterRequest request = new GetParameterRequest().withName(parameterName);
+ GetParameterResult result = ssmClient.getParameter(request);
+ return result.getParameter().getValue();
+
+ } catch (ParameterNotFoundException e) {
+ logger.error("Parameter not found: {}", parameterName, e);
+ throw new RuntimeException("Parameter not found: " + parameterName, e);
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameter: {}", parameterName, e);
+ throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e);
+ }
+ }
+
+ /**
+ * Fetches multiple parameters from AWS Parameter Store in a batch.
+ *
+ * @param parameterNames The list of parameter names to fetch
+ * @return A map of parameter names to their values
+ * @throws IllegalArgumentException If the parameterNames list is null or empty
+ */
+ public Map getParameters(List parameterNames) {
+ if (parameterNames == null || parameterNames.isEmpty()) {
+ logger.error("Parameter names list cannot be null or empty");
+ throw new IllegalArgumentException("Parameter names list cannot be null or empty");
+ }
+
+ try {
+
+ GetParametersRequest request = new GetParametersRequest().withNames(parameterNames);
+ GetParametersResult result = ssmClient.getParameters(request);
+
+ // Map the result to a Map of parameter names and values
+ Map parameters = new HashMap<>();
+ result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue()));
+
+ // Log any parameters that were not found
+ if (!result.getInvalidParameters().isEmpty()) {
+ logger.warn("The following parameters were not found: {}", result.getInvalidParameters());
+ }
+
+ return parameters;
+
+ } catch (AWSSimpleSystemsManagementException e) {
+ logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e);
+ throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e);
+
+ } catch (Exception e) {
+ logger.error("Unexpected error while fetching parameters: {}", parameterNames, e);
+ throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e);
+ }
+ }
+
+}
diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java
new file mode 100644
index 0000000000..ec4165d03f
--- /dev/null
+++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java
@@ -0,0 +1,62 @@
+package com.bazaarvoice.emodb.queue.core.stepfn;
+
+
+import com.amazonaws.services.stepfunctions.AWSStepFunctions;
+import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder;
+import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
+import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to interact with AWS Step Functions using AWS SDK v1.
+ */
+public class StepFunctionService {
+
+ private static final Logger logger = LoggerFactory.getLogger(StepFunctionService.class);
+
+ private final AWSStepFunctions stepFunctionsClient;
+
+ /**
+ * Constructor to initialize Step Function Client with AWS region and credentials.
+ */
+ public StepFunctionService() {
+ this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard()
+ .build();
+ }
+
+ /**
+ * Starts the execution of a Step Function with the given state machine ARN and input payload.
+ *
+ * @param stateMachineArn ARN of the state machine
+ * @param inputPayload Input for the state machine execution
+ * @throws IllegalArgumentException If the stateMachineArn is invalid
+ */
+ public void startExecution(String stateMachineArn, String inputPayload, String executionName) {
+ if (stateMachineArn == null || stateMachineArn.isEmpty()) {
+ logger.error("State Machine ARN cannot be null or empty");
+ throw new IllegalArgumentException("State Machine ARN cannot be null or empty");
+ }
+
+ if (inputPayload == null) {
+ logger.warn("Input payload is null; using empty JSON object");
+ inputPayload = "{}"; // Default to empty payload if null
+ }
+
+ try {
+ StartExecutionRequest startExecutionRequest = new StartExecutionRequest()
+ .withStateMachineArn(stateMachineArn)
+ .withInput(inputPayload)
+ .withName(executionName);
+
+ StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest);
+
+ logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn);
+ logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn());
+
+ } catch (Exception e) {
+ logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e);
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
index 90a0ea837d..0ac9a0ed89 100644
--- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
+++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java
@@ -4,7 +4,10 @@
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.job.api.JobType;
-import com.codahale.metrics.MetricRegistry;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
+import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
+import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;
import java.time.Clock;
@@ -38,7 +41,7 @@ public void testSizeCache() {
BaseEventStore mockEventStore = mock(BaseEventStore.class);
AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class),
- mock(JobHandlerRegistry.class), mock(JobType.class), clock, new MetricRegistry()){};
+ mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){};
// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000
diff --git a/sdk/pom.xml b/sdk/pom.xml
index e8f8b7eb5b..e264ab8e53 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/sor-api/pom.xml b/sor-api/pom.xml
index d7bc799c60..058e8c28ba 100644
--- a/sor-api/pom.xml
+++ b/sor-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml
index 8028d79623..f345505763 100644
--- a/sor-client-common/pom.xml
+++ b/sor-client-common/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml
index 4141cd9d99..dafa84a916 100644
--- a/sor-client-jersey2/pom.xml
+++ b/sor-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/sor-client/pom.xml b/sor-client/pom.xml
index 100d90210e..1496ad9d31 100644
--- a/sor-client/pom.xml
+++ b/sor-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/sor/pom.xml b/sor/pom.xml
index d2a61758ef..7112bfafe4 100644
--- a/sor/pom.xml
+++ b/sor/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/table/pom.xml b/table/pom.xml
index e4592dd6b0..7d962487cf 100644
--- a/table/pom.xml
+++ b/table/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/uac-api/pom.xml b/uac-api/pom.xml
index b9ba2a6008..7b15c45c05 100644
--- a/uac-api/pom.xml
+++ b/uac-api/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml
index 38984d2abd..a422db8bd9 100644
--- a/uac-client-jersey2/pom.xml
+++ b/uac-client-jersey2/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/uac-client/pom.xml b/uac-client/pom.xml
index 55aae7fdb8..82bfb686b4 100644
--- a/uac-client/pom.xml
+++ b/uac-client/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/web-local/pom.xml b/web-local/pom.xml
index 4535e14494..6692d548cc 100644
--- a/web-local/pom.xml
+++ b/web-local/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/web/pom.xml b/web/pom.xml
index eadb8ccbf7..a8a2619eae 100644
--- a/web/pom.xml
+++ b/web/pom.xml
@@ -6,7 +6,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
index 00c380af6a..b86b38943d 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
@@ -381,8 +381,6 @@ public Collection getTablePlacements(@Authenticated Subject subject) {
_getTablePlacementsRequestsByApiKey.getUnchecked(subject.getId()).mark();
return _blobStore.getTablePlacements();
}
-
-
/**
* Retrieves the current version of a piece of content from the data store.
*/
@@ -600,4 +598,4 @@ private Supplier onceOnlySupplier(final InputStream in) {
return in;
};
}
-}
+}
\ No newline at end of file
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
index c6dcd408fc..88e0a04dbc 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java
@@ -110,6 +110,20 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection messages) {
+ // Not partitioned--any server can write messages to Cassandra.
+ _queueService.sendAll(queue, messages,true);
+ return SuccessResponse.instance();
+ }
@POST
@Path("_sendbatch")
@@ -154,6 +168,18 @@ public long getMessageCount(@QueryParam("partitioned") BooleanParam partitioned,
}
}
+ @GET
+ @Path("{queue}/uncached_size")
+ @RequiresPermissions("queue|get_status|{queue}")
+ @Timed(name = "bv.emodb.dedupq.DedupQueueResource1.getUncachedMessageCount", absolute = true)
+ @ApiOperation (value = "gets the uncached Message count.",
+ notes = "Returns a long.",
+ response = long.class
+ )
+ public long getUncachedMessageCount(@PathParam("queue") String queue) {
+ return _queueService.getUncachedSize(queue);
+ }
+
@GET
@Path("{queue}/claimcount")
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
index ff6334db05..e0a312c623 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java
@@ -35,6 +35,7 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -120,6 +121,24 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection events) {
+ //TODO change query param name / type
+ // Not partitioned--any server can write messages to Cassandra.
+ _queueService.sendAll(queue, events, true);
+ return SuccessResponse.instance();
+ }
+
@POST
@Path("_sendbatch")
@Consumes(MediaType.APPLICATION_JSON)
@@ -162,6 +181,17 @@ public long getMessageCount(@PathParam("queue") String queue, @QueryParam("limit
}
}
+ @GET
+ @Path("{queue}/uncached_size")
+ @RequiresPermissions("queue|get_status|{queue}")
+ @Timed(name = "bv.emodb.queue.QueueResource1.getUncachedMessageCount", absolute = true)
+ @ApiOperation (value = "gets the uncached Message count.",
+ notes = "Returns a long.",
+ response = long.class
+ )
+ public long getUncachedMessageCount(@PathParam("queue") String queue) {
+ return _queueService.getUncachedSize(queue);
+ }
@GET
@Path("{queue}/claimcount")
diff --git a/yum/pom.xml b/yum/pom.xml
index adee7cf547..6ae9e54f04 100644
--- a/yum/pom.xml
+++ b/yum/pom.xml
@@ -4,7 +4,7 @@
com.bazaarvoice.emodb
emodb-parent
- 6.5.171-SNAPSHOT
+ 6.5.204-SNAPSHOT
../parent/pom.xml