diff --git a/abc.txt b/abc.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/auth/auth-client/pom.xml b/auth/auth-client/pom.xml index 14e80f2c35..ab46219a7b 100644 --- a/auth/auth-client/pom.xml +++ b/auth/auth-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-core/pom.xml b/auth/auth-core/pom.xml index 257be185db..47d5f4efaf 100644 --- a/auth/auth-core/pom.xml +++ b/auth/auth-core/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-store/pom.xml b/auth/auth-store/pom.xml index 3b9531494e..d26ad49c09 100644 --- a/auth/auth-store/pom.xml +++ b/auth/auth-store/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-util/pom.xml b/auth/auth-util/pom.xml index fe30721176..eb0a847c96 100644 --- a/auth/auth-util/pom.xml +++ b/auth/auth-util/pom.xml @@ -3,7 +3,7 @@ emodb com.bazaarvoice.emodb - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/blob-api/pom.xml b/blob-api/pom.xml index 85123d5a42..afd6ecd6ac 100644 --- a/blob-api/pom.xml +++ b/blob-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/blob-clients/blob-client-common/pom.xml b/blob-clients/blob-client-common/pom.xml index 2551dd9b1f..6a0eee4600 100644 --- a/blob-clients/blob-client-common/pom.xml +++ b/blob-clients/blob-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client-jersey2/pom.xml b/blob-clients/blob-client-jersey2/pom.xml index 1469a20558..ec09f2ce5b 100644 --- a/blob-clients/blob-client-jersey2/pom.xml +++ b/blob-clients/blob-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client/pom.xml b/blob-clients/blob-client/pom.xml index 8eb86f2ce1..5468bebabd 100644 --- a/blob-clients/blob-client/pom.xml +++ b/blob-clients/blob-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/blob/pom.xml b/blob/pom.xml index e4396c74cf..4fb2a01b7e 100644 --- a/blob/pom.xml +++ b/blob/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java index 8eba374154..6ab72adaa1 100644 --- a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java +++ b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/DefaultBlobStore.java @@ -474,4 +474,4 @@ private static void checkLegalBlobId(String blobId) { "Blob IDs must be ASCII strings between 1 and 255 characters in length. " + "Whitespace, ISO control characters and certain punctuation characters that aren't generally allowed in file names are excluded."); } -} +} \ No newline at end of file diff --git a/cachemgr/pom.xml b/cachemgr/pom.xml index c05b60ea79..0db4d08c7f 100644 --- a/cachemgr/pom.xml +++ b/cachemgr/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/common/api/pom.xml b/common/api/pom.xml index b8892b6428..d62a8bed9e 100644 --- a/common/api/pom.xml +++ b/common/api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/astyanax/pom.xml b/common/astyanax/pom.xml index b6eff20acf..4223f7b9e8 100644 --- a/common/astyanax/pom.xml +++ b/common/astyanax/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jax-rs-2/pom.xml b/common/client-jax-rs-2/pom.xml index 1f2e6db668..9e9c6703aa 100644 --- a/common/client-jax-rs-2/pom.xml +++ b/common/client-jax-rs-2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jersey2/pom.xml b/common/client-jersey2/pom.xml index 1a6fe97c68..6cab40d632 100644 --- a/common/client-jersey2/pom.xml +++ b/common/client-jersey2/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/client/pom.xml b/common/client/pom.xml index 640eeb51a0..31d3091f1f 100644 --- a/common/client/pom.xml +++ b/common/client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/dropwizard/pom.xml b/common/dropwizard/pom.xml index 8bb610aba2..dfc1de40a8 100644 --- a/common/dropwizard/pom.xml +++ b/common/dropwizard/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/jersey-client/pom.xml b/common/jersey-client/pom.xml index aa9d419736..f20cab0356 100644 --- a/common/jersey-client/pom.xml +++ b/common/jersey-client/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/json/pom.xml b/common/json/pom.xml index 917adb1936..ba557a73be 100644 --- a/common/json/pom.xml +++ b/common/json/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/stash/pom.xml b/common/stash/pom.xml index bb9f0e5b7b..79b959eedd 100644 --- a/common/stash/pom.xml +++ b/common/stash/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/uuid/pom.xml b/common/uuid/pom.xml index 69c7151265..31f76da7c4 100644 --- a/common/uuid/pom.xml +++ b/common/uuid/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/common/zookeeper/pom.xml b/common/zookeeper/pom.xml index 02e713b07c..29937806ae 100644 --- a/common/zookeeper/pom.xml +++ b/common/zookeeper/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/databus-api/pom.xml b/databus-api/pom.xml index 27ef872a85..c38ed11a39 100644 --- a/databus-api/pom.xml +++ b/databus-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-common/pom.xml b/databus-client-common/pom.xml index 5af9b9d379..4329e99b6f 100644 --- a/databus-client-common/pom.xml +++ b/databus-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-jersey2/pom.xml b/databus-client-jersey2/pom.xml index 7c2794e645..cb01c070d3 100644 --- a/databus-client-jersey2/pom.xml +++ b/databus-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/databus-client/pom.xml b/databus-client/pom.xml index 46b1d8c25d..37e78784d4 100644 --- a/databus-client/pom.xml +++ b/databus-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/databus/pom.xml b/databus/pom.xml index e966f653bb..5089064103 100644 --- a/databus/pom.xml +++ b/databus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/datacenter/pom.xml b/datacenter/pom.xml index d0582217b8..3f432b89bd 100644 --- a/datacenter/pom.xml +++ b/datacenter/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/event/pom.xml b/event/pom.xml index 8e3cd03e8b..cf7c210112 100644 --- a/event/pom.xml +++ b/event/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/job-api/pom.xml b/job-api/pom.xml index 8ac8b8687e..c622d59922 100644 --- a/job-api/pom.xml +++ b/job-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/job/pom.xml b/job/pom.xml index e4a44a41d2..d78652fb72 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/kafka/pom.xml b/kafka/pom.xml index b553098418..c070b6444b 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/megabus/pom.xml b/megabus/pom.xml index 2c91fd1561..1c579e5c28 100644 --- a/megabus/pom.xml +++ b/megabus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/parent/pom.xml b/parent/pom.xml index 27d58c9133..99ec131913 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -11,7 +11,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT pom EmoDB Parent @@ -635,11 +635,22 @@ aws-java-sdk-sns ${aws-sdk.version} + + com.amazonaws + aws-java-sdk-stepfunctions + ${aws-sdk.version} + com.amazonaws aws-java-sdk-sqs ${aws-sdk.version} + + + com.amazonaws + aws-java-sdk-ssm + ${aws-sdk.version} + com.amazonaws aws-java-sdk-sts diff --git a/plugins/pom.xml b/plugins/pom.xml index e67d3d650d..e540c2d93b 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/pom.xml b/pom.xml index 5f048e3d7b..f2dddf3f53 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT parent/pom.xml diff --git a/quality/integration/pom.xml b/quality/integration/pom.xml index f2fca66aa4..8246c931bb 100644 --- a/quality/integration/pom.xml +++ b/quality/integration/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../../parent/pom.xml diff --git a/quality/pom.xml b/quality/pom.xml index ffbd280391..8d6e028d0e 100644 --- a/quality/pom.xml +++ b/quality/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/pom.xml b/queue-api/pom.xml index b7b7eb2b33..54c0c33595 100644 --- a/queue-api/pom.xml +++ b/queue-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java index 41991d5b95..3b1fd2ffde 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthDedupQueueService.java @@ -64,4 +64,6 @@ public interface AuthDedupQueueService { /** Delete all messages in the queue, for debugging/testing. */ void purge(@Credential String apiKey, String queue); + + void sendAll(String apiKey, String queue, Collection messages, boolean isFlush); } diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java index a077eb2062..1bae1893f1 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/AuthQueueService.java @@ -18,6 +18,7 @@ public interface AuthQueueService { void sendAll(@Credential String apiKey, String queue, Collection messages); void sendAll(@Credential String apiKey, Map> messagesByQueue); + void sendAll(@Credential String apiKey, String queue, Collection messages, boolean isFlush); /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed @@ -64,4 +65,6 @@ public interface AuthQueueService { /** Delete all messages in the queue, for debugging/testing. */ void purge(@Credential String apiKey, String queue); + + } diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java index 3fcd38b5a4..3f5fd99950 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/BaseQueueService.java @@ -1,5 +1,6 @@ package com.bazaarvoice.emodb.queue.api; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -15,6 +16,8 @@ public interface BaseQueueService { void sendAll(Map> messagesByQueue); + void sendAll(String queue, Collection messages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. @@ -24,6 +27,10 @@ public interface BaseQueueService { */ long getMessageCount(String queue); + default long getUncachedSize(String queue){ + return 0; + } + /** * Counts the total number of messages for the specified queue, accurate up to the specified limit. Beyond the * specified limit the message count will be a rough estimate, allowing the caller to make the trade-off between @@ -60,4 +67,4 @@ public interface BaseQueueService { /** Delete all messages in the queue, for debugging/testing. */ void purge(String queue); -} +} \ No newline at end of file diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java index 12ab97a45a..a6dc77515b 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/DedupQueueService.java @@ -15,6 +15,8 @@ public interface DedupQueueService extends BaseQueueService { void sendAll(Map> messagesByQueue); + void sendAll(String queue, Collectionmessages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. diff --git a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java index c87740330c..4d533c755e 100644 --- a/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java +++ b/queue-api/src/main/java/com/bazaarvoice/emodb/queue/api/QueueService.java @@ -13,8 +13,12 @@ public interface QueueService extends BaseQueueService { void sendAll(String queue, Collection messages); + void sendAll(Map> messagesByQueue); + //Overloaded sendAll method to send to cassandra + void sendAll(String queue, Collection messages, boolean isFlush); + /** * Counts pending messages for the specified queue. The count will include messages that are currently claimed * and not returned by the {@link #poll} method. diff --git a/queue-client-common/pom.xml b/queue-client-common/pom.xml index d71aae5218..d61cd7eb60 100644 --- a/queue-client-common/pom.xml +++ b/queue-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java index b8aaa83667..7e01cd8b91 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java @@ -75,6 +75,25 @@ public void sendAll(String apiKey, String queue, Collection messages) { } } + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + requireNonNull(queue, "queue"); + requireNonNull(messages, "messages"); + if (messages.isEmpty()) { + return; + } + try { + URI uri = _queueService.clone() + .segment(queue, "sendbatch") + .build(); + _client.resource(uri) + .type(MediaType.APPLICATION_JSON_TYPE) + .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey) + .post(messages); + } catch (EmoClientException e) { + throw convertException(e); + } + } + // Any server can handle sending messages, no need for @PartitionKey public void sendAll(String apiKey, Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java index 49d4c240f2..01fa7830eb 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java @@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) { _authDedupQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authDedupQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java index 683af2162d..92769441dd 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueClient.java @@ -32,6 +32,7 @@ public QueueClient(URI endPoint, boolean partitionSafe, EmoClient client) { super(endPoint, partitionSafe, client); } + @Override public long getMessageCount(String apiKey, String queue) { // Any server can handle this request, no need for @PartitionKey diff --git a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java index 29f8fd4ae6..714897a36e 100644 --- a/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java +++ b/queue-client-common/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java @@ -38,6 +38,11 @@ public void sendAll(Map> messagesByQueue) { _authQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-jersey2/pom.xml b/queue-client-jersey2/pom.xml index 2eb7d7ceef..b1de7085d4 100644 --- a/queue-client-jersey2/pom.xml +++ b/queue-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java index bd4859d58f..7e315932b7 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/AbstractQueueClient.java @@ -67,6 +67,23 @@ public void sendAll(String apiKey, String queue, Collection messages) { .post(messages)); } + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + requireNonNull(queue, "queue"); + requireNonNull(messages, "messages"); + if (messages.isEmpty()) { + return; + } + URI uri = _queueService.clone() + .segment(queue, "sendbatch") + .build(); + + Failsafe.with(_retryPolicy) + .run(() -> _client.resource(uri) + .type(MediaType.APPLICATION_JSON_TYPE) + .header(ApiKeyRequest.AUTHENTICATION_HEADER, apiKey) + .post(messages)); + } + public void sendAll(String apiKey, Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); if (messagesByQueue.isEmpty()) { diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java index f37405182b..19df050f64 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/DedupQueueServiceAuthenticatorProxy.java @@ -36,6 +36,11 @@ public void sendAll(Map> messagesByQueue) { _authDedupQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authDedupQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authDedupQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java index 144b991f32..fef04a42e1 100644 --- a/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java +++ b/queue-client-jersey2/src/main/java/com/bazaarvoice/emodb/queue/client/QueueServiceAuthenticatorProxy.java @@ -35,6 +35,11 @@ public void sendAll(Map> messagesByQueue) { _authQueueService.sendAll(_apiKey, messagesByQueue); } + @Override + public void sendAll(String queue, Collection messages, boolean isFlush) { + _authQueueService.sendAll(_apiKey, queue, messages, isFlush); + } + @Override public MoveQueueStatus getMoveStatus(String reference) { return _authQueueService.getMoveStatus(_apiKey, reference); diff --git a/queue-client/pom.xml b/queue-client/pom.xml index 53a53dc625..e4c065cc3a 100644 --- a/queue-client/pom.xml +++ b/queue-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/queue/pom.xml b/queue/pom.xml index c38a0b5332..67a621be8b 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml @@ -75,20 +75,24 @@ com.fasterxml.jackson.core jackson-annotations + + com.fasterxml.jackson.core + jackson-core + com.fasterxml.jackson.core jackson-databind ${jackson.databind.version} - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-annotations - - + + + + + + + + + + javax.validation @@ -98,6 +102,11 @@ org.apache.curator curator-framework + + org.slf4j + slf4j-api + + @@ -110,5 +119,21 @@ testng test + + org.apache.kafka + kafka-clients + + + com.amazonaws + aws-java-sdk-core + + + com.amazonaws + aws-java-sdk-stepfunctions + + + com.amazonaws + aws-java-sdk-ssm + diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java index 20ff816a5c..b42d33b02c 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/QueueModule.java @@ -21,6 +21,9 @@ import com.bazaarvoice.emodb.queue.core.DefaultDedupQueueService; import com.bazaarvoice.emodb.queue.core.DefaultQueueService; import com.bazaarvoice.emodb.queue.core.QueueChannelConfiguration; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.ostrich.HostDiscovery; import com.codahale.metrics.MetricRegistry; import com.google.common.base.Supplier; @@ -82,6 +85,14 @@ protected void configure() { bind(new TypeLiteral>() {}).annotatedWith(DedupEnabled.class).toInstance(Suppliers.ofInstance(true)); install(new EventStoreModule("bv.emodb.queue", _metricRegistry)); + // Bind Kafka services + bind (KafkaAdminService.class).asEagerSingleton(); + bind(KafkaProducerService.class).asEagerSingleton(); + + // Bind Step Function Service + bind(StepFunctionService.class).asEagerSingleton(); + + // Bind the Queue instance that the rest of the application will consume bind(QueueService.class).to(DefaultQueueService.class).asEagerSingleton(); expose(QueueService.class); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index f07083616d..0793a08b2a 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -17,11 +17,17 @@ import com.bazaarvoice.emodb.queue.api.MoveQueueStatus; import com.bazaarvoice.emodb.queue.api.Names; import com.bazaarvoice.emodb.queue.api.UnknownMoveException; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaConfig; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.ssm.ParameterStoreUtil; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Supplier; +import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -33,39 +39,58 @@ import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + abstract class AbstractQueueService implements BaseQueueService { + private final Logger _log = LoggerFactory.getLogger(AbstractQueueService.class); private final BaseEventStore _eventStore; private final JobService _jobService; private final JobType _moveQueueJobType; private final LoadingCache> _queueSizeCache; - private final Meter _sendAllMeterAQS; - private final Meter _sendAllMeterNullAQS; - - private final Meter _pollAQS; - private final Meter _pollNullAQS; + private final KafkaAdminService adminService; + private final KafkaProducerService producerService; + // Configuration keys for Kafka topic settings + private static final Integer TOPIC_PARTITION_COUNT = 3; + private static final Short TOPIC_REPLICATION_FACTOR = 3; public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024; + private final StepFunctionService stepFunctionService; + private final ParameterStoreUtil parameterStoreUtil; + + // Cache for the isExperiment value with a TTL of 5 minutes + private final Cache experimentCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + private static final String IS_EXPERIMENT = "isExperiment"; + + private final Cache> allowedQueuesCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + private static final String ALLOWED_QUEUES = "allowedQueues"; + + private static final String UNIVERSE = KafkaConfig.getUniverseFromEnv(); protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, JobType moveQueueJobType, - Clock clock, MetricRegistry metricRegistry) { + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { _eventStore = eventStore; _jobService = jobService; _moveQueueJobType = moveQueueJobType; + this.adminService = adminService; + this.producerService = producerService; + this.stepFunctionService = stepFunctionService; + this.parameterStoreUtil = new ParameterStoreUtil(); - registerMoveQueueJobHandler(jobHandlerRegistry); + registerMoveQueueJobHandler(jobHandlerRegistry); _queueSizeCache = CacheBuilder.newBuilder() .expireAfterWrite(15, TimeUnit.SECONDS) .maximumSize(2000) @@ -77,11 +102,6 @@ public Map.Entry load(SizeCacheKey key) return Maps.immutableEntry(internalMessageCountUpTo(key.channelName, key.limitAsked), key.limitAsked); } }); - _sendAllMeterAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllAQS")); - _sendAllMeterNullAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllNullAQS")); - _pollAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollAQS")); - _pollNullAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollNullAQS")); - } private void registerMoveQueueJobHandler(JobHandlerRegistry jobHandlerRegistry) { @@ -109,42 +129,148 @@ public MoveQueueResult run(MoveQueueRequest request) }); } + /** + * Retrieves the value of the "isExperiment" flag from the cache if available. + * If the value is not present in the cache or the cache has expired, it fetches the value + * from AWS Parameter Store and stores it in the cache. + *

+ * The cached value has a TTL (Time-To-Live) of 5 minutes, after which it will be refreshed + * from the Parameter Store on the next access. + *

+ * + * @return {@code true} if the experiment is still running, otherwise {@code false}. + * @throws RuntimeException if there is an error fetching the value from the cache or Parameter Store. + */ + private boolean getIsExperimentValue() { + try { + // Attempt to retrieve from cache + return experimentCache.get(IS_EXPERIMENT, () -> { + + Boolean checkExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment")); + _log.info("IS_EXPERIMENT is refreshed {}", checkExperiment); + // If absent or expired, fetch from Parameter Store and cache the result + return checkExperiment; + }); + } catch (Exception e) { + // Handle any errors that might occur while accessing the cache or Parameter Store + throw new RuntimeException("Error fetching experiment flag", e); + } + } + @Override public void send(String queue, Object message) { - sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + //boolean isExperiment = Boolean.parseBoolean(parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/isExperiment")); + boolean isExperiment = getIsExperimentValue(); + if (!isExperiment) { + // Experiment is over now, send everything to Kafka + sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + } else { + List allowedQueues = fetchAllowedQueues(); + // Experiment is still running, check if the queue is allowed + if (allowedQueues.contains(queue)) { + // Send to Kafka, only if it's an allowed queue + sendAll(Collections.singletonMap(queue, Collections.singleton(message))); + } else { + // Send to Cassandra (rollback plan) + sendAll(queue, Collections.singleton(message), false); + } + } } @Override public void sendAll(String queue, Collection messages) { - sendAll(Collections.singletonMap(queue, messages)); + List allowedQueues = fetchAllowedQueues(); + boolean isExperiment = getIsExperimentValue(); + if (!isExperiment) { + // experiment is over now, send everything to kafka + sendAll(Collections.singletonMap(queue, messages)); + } else { + // Experiment is still running, check if the queue is allowed + if(allowedQueues.contains(queue)){ + //send kafka , only if its allowed queue + sendAll(Collections.singletonMap(queue, messages)); + } + else { + //send to cassandra, (rollback plan) + sendAll(queue, messages, false); + } + } + } + + + private void validateMessage(Object message) { + // Check if the message is valid using JsonValidator + ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); + + // Check if the message size exceeds the allowed limit + checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, + "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + + } + + private void validateQueue(String queue, Collection messages) { + requireNonNull(queue, "Queue name cannot be null"); + requireNonNull(messages, "Messages collection cannot be null"); + + // Check if the queue name is legal + checkLegalQueueName(queue); } @Override public void sendAll(Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); - if(messagesByQueue.keySet().isEmpty()){ - _sendAllMeterNullAQS.mark(); - } else { - _sendAllMeterAQS.mark(messagesByQueue.keySet().size()); - } - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); for (Map.Entry> entry : messagesByQueue.entrySet()) { String queue = entry.getKey(); Collection messages = entry.getValue(); - checkLegalQueueName(queue); - requireNonNull(messages, "messages"); + validateQueue(queue, messages); - List events = Lists.newArrayListWithCapacity(messages.size()); - for (Object message : messages) { - ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); - checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + List events = Lists.newArrayListWithCapacity(messages.size()); - events.add(messageByteBuffer); + // Validate each message + for (Object message : messages) { + validateMessage(message); + events.add(message.toString()); } builder.putAll(queue, events); } + + Multimap eventsByChannel = builder.build(); + + String queueType = determineQueueType(); + for (Map.Entry> topicEntry : eventsByChannel.asMap().entrySet()) { + String queueName= topicEntry.getKey(); + String topic = "dsq-" + (("dedupq".equals(queueType)) ? "dedup-" + queueName : queueName); + // Check if the topic exists, if not create it and execute Step Function + if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) { + Map parameters = fetchStepFunctionParameters(); + // Execute Step Function after topic creation + startStepFunctionExecution(parameters, queueType,queueName, topic); + } + producerService.sendMessages(topic, topicEntry.getValue(), queueType); + _log.info("Messages sent to topic: {}", topic); + } + } + + @Override + public void sendAll(String queue, Collection messages, boolean fromKafka) { + //incoming message from kafka consume, save to cassandra + if(!fromKafka){ + validateQueue(queue, messages); + } + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + List events = Lists.newArrayListWithCapacity(messages.size()); + + + for (Object message : messages) { + ByteBuffer messageByteBuffer = MessageSerializer.toByteBuffer(JsonValidator.checkValid(message)); + checkArgument(messageByteBuffer.limit() <= MAX_MESSAGE_SIZE_IN_BYTES, + "Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size"); + events.add(messageByteBuffer); + } + builder.putAll(queue, events); Multimap eventsByChannel = builder.build(); _eventStore.addAll(eventsByChannel); @@ -155,6 +281,11 @@ public long getMessageCount(String queue) { return getMessageCountUpTo(queue, Long.MAX_VALUE); } + @Override + public long getUncachedSize(String queue){ + return internalMessageCountUpTo(queue, Long.MAX_VALUE); + } + @Override public long getMessageCountUpTo(String queue, long limit) { // We get the size from cache as a tuple of size, and the limit used to estimate that size @@ -196,14 +327,8 @@ public List poll(String queue, Duration claimTtl, int limit) { checkLegalQueueName(queue); checkArgument(claimTtl.toMillis() >= 0, "ClaimTtl must be >=0"); checkArgument(limit > 0, "Limit must be >0"); - List response = toMessages(_eventStore.poll(queue, claimTtl, limit)); - if(response.isEmpty()){ - _pollNullAQS.mark(); - } - else{ - _pollAQS.mark(response.size()); - } - return response; + + return toMessages(_eventStore.poll(queue, claimTtl, limit)); } @Override @@ -219,7 +344,6 @@ public void renew(String queue, Collection messageIds, Duration claimTtl public void acknowledge(String queue, Collection messageIds) { checkLegalQueueName(queue); requireNonNull(messageIds, "messageIds"); - _eventStore.delete(queue, messageIds, true); } @@ -298,4 +422,104 @@ private void checkLegalQueueName(String queue) { "Allowed punctuation characters are -.:@_ and the queue name may not start with a single underscore character. " + "An example of a valid table name would be 'polloi:provision'."); } -} + /** + * Fetches the necessary Step Function parameters from AWS Parameter Store. + */ + private Map fetchStepFunctionParameters() { + List parameterNames = Arrays.asList( + "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn", + "/"+ UNIVERSE+ "/emodb/stepfn/queueThreshold", + "/"+ UNIVERSE+ "/emodb/stepfn/batchSize", + "/"+ UNIVERSE+ "/emodb/stepfn/interval" + ); + + try { + return parameterStoreUtil.getParameters(parameterNames); + } catch (Exception e) { + _log.error("Failed to fetch Step Function parameters from Parameter Store", e); + throw new RuntimeException("Error fetching Step Function parameters", e); + } + } + + /** + * Executes the Step Function for a given topic after it has been created. + */ + + private void startStepFunctionExecution(Map parameters, String queueType, String queueName, String topic) { + try { + String stateMachineArn = parameters.get( "/"+ UNIVERSE+ "/emodb/stepfn/stateMachineArn"); + int queueThreshold = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/queueThreshold")); + int batchSize = Integer.parseInt(parameters.get ("/"+ UNIVERSE+ "/emodb/stepfn/batchSize")); + int interval = Integer.parseInt(parameters.get( "/"+ UNIVERSE+"/emodb/stepfn/interval")); + + String inputPayload = createInputPayload(queueThreshold, batchSize, queueType, queueName, topic, interval); + + // Create the timestamp + String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds + + // Check if queueType is "dedupq" and prepend "D" to execution name if true + String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp; + + // Start the Step Function execution + stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName); + + _log.info("Step Function executed for topic: {} with executionName: {}", topic, executionName); + } catch (Exception e) { + _log.error("Error executing Step Function for topic: {}", topic, e); + throw new RuntimeException("Error executing Step Function for topic: " + topic, e); + } + } + + /** + * Determines the queue type based on the event store. + */ + private String determineQueueType() { + if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) { + return "dedupq"; + } + return "queue"; + } + + private List fetchAllowedQueues() { + try { + return new ArrayList<>(allowedQueuesCache.get(ALLOWED_QUEUES, this::fetchAllowedQueuesFromParamStore)); + } catch (Exception e) { + // Handle the case when the parameter is not found or fetching fails + _log.error("Error fetching allowedQueues in fetchAllowedQueues: " + e.getMessage()); + return Collections.singletonList(""); // Default to an empty list if the parameter is missing + } + + } + + private Set fetchAllowedQueuesFromParamStore() { + try { + // Fetch the 'allowedQueues' parameter using ParameterStoreUtil + String allowedQueuesStr = parameterStoreUtil.getParameter("/" + UNIVERSE + "/emodb/experiment/allowedQueues"); + _log.info("ALLOWED_QUEUES is refreshed"); + return new HashSet<>(Arrays.asList(allowedQueuesStr.split(","))); + } catch (Exception e) { + // Handle the case when the parameter is not found or fetching fails + _log.error("Error fetching allowedQueues: " + e.getMessage()); + return new HashSet<>(Collections.singletonList("")); // Default to an empty list if the parameter is missing + } + } + + private String createInputPayload(int queueThreshold, int batchSize, String queueType, String queueName, String topicName, int interval) { + Map payloadData = new HashMap<>(); + payloadData.put("queueThreshold", queueThreshold); + payloadData.put("batchSize", batchSize); + payloadData.put("queueType", queueType); + payloadData.put("queueName", queueName); + payloadData.put("topicName", topicName); + payloadData.put("interval", interval); + Map wrappedData = new HashMap<>(); + wrappedData.put("executionInput", payloadData); // Wrap the data + try { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(wrappedData); // Convert wrapped data to JSON + } catch (JsonProcessingException e) { + _log.error("Error while converting map to JSON", e); + return "{}"; // Return empty JSON object on error + } + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java index 0955166218..9ec8d36606 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java @@ -4,7 +4,9 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.DedupQueueService; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; import java.time.Clock; @@ -12,7 +14,7 @@ public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService { @Inject public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, MetricRegistry metricRegistry) { - super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock, metricRegistry); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService ); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java index 947572208d..524ca50033 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java @@ -4,7 +4,9 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.QueueService; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; import com.google.inject.Inject; import java.time.Clock; @@ -12,7 +14,7 @@ public class DefaultQueueService extends AbstractQueueService implements QueueService { @Inject public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock, MetricRegistry metricRegistry) { - super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock, metricRegistry); + Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) { + super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java index 47e24ccf38..b349b19298 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedDedupQueueService.java @@ -91,6 +91,11 @@ public void purge(String apiKey, String queue) { _dedupQueueService.purge(queue); } + @Override + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + _dedupQueueService.sendAll(queue, messages, isFlush); + } + @Override public void sendAll(String apiKey, Map> messagesByQueue) { _dedupQueueService.sendAll(messagesByQueue); diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java index 5ceea10a8e..cdafc8935e 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/TrustedQueueService.java @@ -95,4 +95,9 @@ public void purge(String apiKey, String queue) { public void sendAll(String apiKey, Map> messagesByQueue) { _queueService.sendAll(messagesByQueue); } + + @Override + public void sendAll(String apiKey, String queue, Collection messages, boolean isFlush) { + + } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java new file mode 100644 index 0000000000..9621617ba3 --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaAdminService.java @@ -0,0 +1,120 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import java.util.Collections; + +public class KafkaAdminService { + private static final Logger _log = LoggerFactory.getLogger(KafkaAdminService.class); + private final AdminClient adminClient; + // Cache for the list of all topics with a TTL of 10 minutes + private final Cache> topicListCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(); + + private static final String TOPIC_LIST_KEY = "allTopics"; + + + public KafkaAdminService() { + this.adminClient = AdminClient.create(KafkaConfig.getAdminProps()); + } + + /** + * Creates a new Kafka topic with the specified configurations. + * + * @param topic The name of the topic. + * @param numPartitions Number of partitions. + * @param replicationFactor Replication factor. + */ + public Boolean createTopicIfNotExists(String topic, int numPartitions, short replicationFactor, String queueType) { + Boolean isExisting =isTopicExists(topic); + if (! isExisting) { + //create the topic now + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); + try { + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + addToCache(topic); + _log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor); + } catch (Exception e) { + _log.error("Error creating topic {}: ", topic, e); + throw new RuntimeException(e); + } + } + return isExisting; + } + public void addToCache(String topic){ + Set topics = topicListCache.getIfPresent(TOPIC_LIST_KEY); + if (topics == null) { + topics = new HashSet<>(); + } else { + // Create a new mutable Set if the existing one is unmodifiable + topics = new HashSet<>(topics); + } + topics.add(topic); + topicListCache.put(TOPIC_LIST_KEY, topics); + _log.info("Added newly created topic to cache: {}", topic); + } + + + /** + * Checks if a Kafka topic exists by using a cache to store the list of all topics. + * If the cache entry has expired or the cache is empty, it queries the Kafka AdminClient for the topic list. + *

+ * The cached list has a TTL (Time-To-Live) of 10 minutes, after which it will be refreshed + * from Kafka on the next access. + *

+ * + * @param topic the name of the Kafka topic to check + * @return {@code true} if the topic exists, otherwise {@code false}. + * @throws RuntimeException if there is an error fetching the topic list or checking if the topic exists. + */ + public boolean isTopicExists(String topic) { + try { + // Retrieve the list of topics from the cache + Set topics = topicListCache.get(TOPIC_LIST_KEY, this::fetchTopicListFromKafka); + + // Check if the given topic is in the cached list + return topics.contains(topic); + } catch (ExecutionException e) { + _log.error("Failed to check if topic exists: {}", topic, e); + throw new RuntimeException("Error checking if topic exists", e); + } + } + + /** + * Fetches the list of all topic names from Kafka AdminClient. + * This method is called only when the cache is expired or empty. + * + * @return a Set containing all topic names. + * @throws ExecutionException if there is an error fetching the topic list from Kafka. + */ + private Set fetchTopicListFromKafka() throws ExecutionException { + try { + _log.info("Fetching topic list from Kafka"); + return adminClient.listTopics().names().get(); + } catch (Exception e) { + _log.error("Error fetching topic list from Kafka", e); + throw new ExecutionException(e); + } + } + + /** + * Closes the AdminClient to release resources. + */ + public void close() { + if (adminClient != null) { + adminClient.close(); + } + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java new file mode 100644 index 0000000000..13011e8bcc --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java @@ -0,0 +1,157 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.Parameter; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +public class KafkaConfig { + private static String bootstrapServersConfig; + private static String batchSizeConfig; + private static String retriesConfig; + private static String lingerMsConfig; + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + // Static SSM Client and configuration using AWS SDK v1 + private static final AWSSimpleSystemsManagement ssmClient = AWSSimpleSystemsManagementClientBuilder + .standard() + .build(); + + + static { + try { + final String UNIVERSE = getUniverseFromEnv(); + // Load configurations from SSM during static initialization + Map parameterValues = getParameterValues( + Arrays.asList( + "/" + UNIVERSE + "/emodb/kafka/batchSize", + "/" + UNIVERSE + "/emodb/kafka/retries", + "/" + UNIVERSE + "/emodb/kafka/lingerMs", + "/" + UNIVERSE + "/emodb/kafka/bootstrapServers" + ) + ); + + // Set configurations with fallback to defaults if not present + // Sets the batch size for Kafka producer, which controls the amount of data to batch before sending. + batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384"); + + // Sets the number of retry attempts for failed Kafka message sends. + retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3"); + + // Sets the number of milliseconds a producer is willing to wait before sending a batch out + lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1"); + + // Configures the Kafka broker addresses for producer connections. + bootstrapServersConfig = parameterValues.get("/" + UNIVERSE + "/emodb/kafka/bootstrapServers"); + + logger.info("Kafka configurations loaded successfully from SSM."); + } catch (AmazonServiceException e) { + logger.error("Failed to load configurations from SSM. Using default values.", e); + throw e; + } + catch (Exception e) { + logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e); + throw e; + } + } + + public static String getUniverseFromEnv() { + String filePath = "/etc/environment"; + logger.info("Reading environment file: " + filePath); + Properties environmentProps = new Properties(); + + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + String line; + while ((line = reader.readLine()) != null) { + // Skip empty lines or comments + if (line.trim().isEmpty() || line.trim().startsWith("#")) { + continue; + } + // Split the line into key-value pair + String[] parts = line.split("=", 2); + logger.info("parts: " + Arrays.toString(parts)); + if (parts.length == 2) { + String key = parts[0].trim(); + String value = parts[1].trim(); + // Remove any surrounding quotes from value + value = value.replace("\"", ""); + environmentProps.put(key, value); + } + } + // Access the environment variables + return environmentProps.getProperty("UNIVERSE"); + } catch (IOException e) { + logger.error("Error reading environment file: " + e.getMessage()); + throw new RuntimeException("Error reading environment file: " + e.getMessage()); + } + } + // Fetch parameters from AWS SSM using AWS SDK v1 + private static Map getParameterValues(List parameterNames) { + try { + GetParametersRequest request = new GetParametersRequest() + .withNames(parameterNames) + .withWithDecryption(true); + + GetParametersResult response = ssmClient.getParameters(request); + + return response.getParameters().stream() + .collect(Collectors.toMap(Parameter::getName, Parameter::getValue)); + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from SSM.", e); + throw e; // Rethrow or handle the exception if necessary + } + } + + // Kafka Producer properties + public static Properties getProducerProps() { + Properties producerProps = new Properties(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retriesConfig)); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMsConfig)); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSizeConfig)); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Default buffer memory setting + logger.info("Kafka Producer properties initialized."); + return producerProps; + } + + // Kafka Admin properties + public static Properties getAdminProps() { + Properties adminProps = new Properties(); + + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + logger.info("Kafka Admin properties initialized."); + return adminProps; + } + + // Ensure the SSM client is closed when the application shuts down + public static void shutdown() { + if (ssmClient != null) { + try { + ssmClient.shutdown(); + logger.info("SSM client closed successfully."); + } catch (Exception e) { + logger.error("Error while closing SSM client.", e); + } + } + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java new file mode 100644 index 0000000000..1a7cc72eee --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaProducerService.java @@ -0,0 +1,66 @@ +package com.bazaarvoice.emodb.queue.core.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Future; + +public class KafkaProducerService { + private static final Logger _log = LoggerFactory.getLogger(KafkaProducerService.class); + private final KafkaProducer producer; // Changed to String + + public KafkaProducerService() { + this.producer = new KafkaProducer<>(KafkaConfig.getProducerProps()); + _log.info("KafkaProducerService initialized with producer properties: {}", KafkaConfig.getProducerProps()); + } + + /** + * Sends each message from the collection to the specified Kafka topic separately. + * + * @param topic The Kafka topic. + * @param events The collection of messages to be sent. + */ + public void sendMessages(String topic, Collection events, String queueType) { + LocalDateTime startTime = LocalDateTime.now(); + _log.info("Sending {} messages to topic '{}'", events.size(), topic); + List> futures = new ArrayList<>(); + // Use async sendMessage and collect futures + for (String event : events) { + futures.add(producer.send(new ProducerRecord<>(topic, event))); + } + + // Wait for all futures to complete + for (Future future : futures) { + try { + future.get(); // Only blocks if a future is not yet complete + } catch (Exception e) { + _log.error("Error while sending message to Kafka: {}", e.getMessage()); + throw new RuntimeException("Error sending messages to Kafka", e); + } + } + _log.info("Finished sending messages to topic '{}' time taken : {} milliseconds", topic, Duration.between(startTime,LocalDateTime.now()).toMillis()); + } + + + /** + * Closes the producer to release resources. + */ + public void close() { + _log.info("Closing Kafka producer."); + try { + producer.flush(); + producer.close(); + } catch (Exception e) { + _log.error("Error while closing Kafka producer: ", e); + throw e; + } + } +} \ No newline at end of file diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java new file mode 100644 index 0000000000..d2d65c144f --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/ssm/ParameterStoreUtil.java @@ -0,0 +1,107 @@ +package com.bazaarvoice.emodb.queue.core.ssm; + +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagement; +import com.amazonaws.services.simplesystemsmanagement.AWSSimpleSystemsManagementClientBuilder; +import com.amazonaws.services.simplesystemsmanagement.model.GetParameterRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParameterResult; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersRequest; +import com.amazonaws.services.simplesystemsmanagement.model.GetParametersResult; +import com.amazonaws.services.simplesystemsmanagement.model.ParameterNotFoundException; +import com.amazonaws.services.simplesystemsmanagement.model.AWSSimpleSystemsManagementException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class for interacting with AWS Parameter Store using AWS SDK v1. + */ +public class ParameterStoreUtil { + + private static final Logger logger = LoggerFactory.getLogger(ParameterStoreUtil.class); + private final AWSSimpleSystemsManagement ssmClient; + + /** + * Constructor to initialize the SSM client + */ + public ParameterStoreUtil() { + // Create SSM client with default credentials and region + ssmClient = AWSSimpleSystemsManagementClientBuilder.standard() + .build(); + } + + /** + * Fetches a parameter from AWS Parameter Store. + * + * @param parameterName The name of the parameter to fetch + * @return The value of the parameter + * @throws IllegalArgumentException If the parameterName is null or empty + */ + public String getParameter(String parameterName) { + if (parameterName == null || parameterName.isEmpty()) { + logger.error("Parameter name cannot be null or empty"); + throw new IllegalArgumentException("Parameter name cannot be null or empty"); + } + + try { + + GetParameterRequest request = new GetParameterRequest().withName(parameterName); + GetParameterResult result = ssmClient.getParameter(request); + return result.getParameter().getValue(); + + } catch (ParameterNotFoundException e) { + logger.error("Parameter not found: {}", parameterName, e); + throw new RuntimeException("Parameter not found: " + parameterName, e); + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameter from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameter from AWS SSM: " + parameterName, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameter: {}", parameterName, e); + throw new RuntimeException("Unexpected error fetching parameter: " + parameterName, e); + } + } + + /** + * Fetches multiple parameters from AWS Parameter Store in a batch. + * + * @param parameterNames The list of parameter names to fetch + * @return A map of parameter names to their values + * @throws IllegalArgumentException If the parameterNames list is null or empty + */ + public Map getParameters(List parameterNames) { + if (parameterNames == null || parameterNames.isEmpty()) { + logger.error("Parameter names list cannot be null or empty"); + throw new IllegalArgumentException("Parameter names list cannot be null or empty"); + } + + try { + + GetParametersRequest request = new GetParametersRequest().withNames(parameterNames); + GetParametersResult result = ssmClient.getParameters(request); + + // Map the result to a Map of parameter names and values + Map parameters = new HashMap<>(); + result.getParameters().forEach(param -> parameters.put(param.getName(), param.getValue())); + + // Log any parameters that were not found + if (!result.getInvalidParameters().isEmpty()) { + logger.warn("The following parameters were not found: {}", result.getInvalidParameters()); + } + + return parameters; + + } catch (AWSSimpleSystemsManagementException e) { + logger.error("Error fetching parameters from AWS SSM: {}", e.getMessage(), e); + throw new RuntimeException("Error fetching parameters from AWS SSM: " + parameterNames, e); + + } catch (Exception e) { + logger.error("Unexpected error while fetching parameters: {}", parameterNames, e); + throw new RuntimeException("Unexpected error fetching parameters: " + parameterNames, e); + } + } + +} diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java new file mode 100644 index 0000000000..ec4165d03f --- /dev/null +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/stepfn/StepFunctionService.java @@ -0,0 +1,62 @@ +package com.bazaarvoice.emodb.queue.core.stepfn; + + +import com.amazonaws.services.stepfunctions.AWSStepFunctions; +import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder; +import com.amazonaws.services.stepfunctions.model.StartExecutionRequest; +import com.amazonaws.services.stepfunctions.model.StartExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to interact with AWS Step Functions using AWS SDK v1. + */ +public class StepFunctionService { + + private static final Logger logger = LoggerFactory.getLogger(StepFunctionService.class); + + private final AWSStepFunctions stepFunctionsClient; + + /** + * Constructor to initialize Step Function Client with AWS region and credentials. + */ + public StepFunctionService() { + this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard() + .build(); + } + + /** + * Starts the execution of a Step Function with the given state machine ARN and input payload. + * + * @param stateMachineArn ARN of the state machine + * @param inputPayload Input for the state machine execution + * @throws IllegalArgumentException If the stateMachineArn is invalid + */ + public void startExecution(String stateMachineArn, String inputPayload, String executionName) { + if (stateMachineArn == null || stateMachineArn.isEmpty()) { + logger.error("State Machine ARN cannot be null or empty"); + throw new IllegalArgumentException("State Machine ARN cannot be null or empty"); + } + + if (inputPayload == null) { + logger.warn("Input payload is null; using empty JSON object"); + inputPayload = "{}"; // Default to empty payload if null + } + + try { + StartExecutionRequest startExecutionRequest = new StartExecutionRequest() + .withStateMachineArn(stateMachineArn) + .withInput(inputPayload) + .withName(executionName); + + StartExecutionResult startExecutionResult = stepFunctionsClient.startExecution(startExecutionRequest); + + logger.info("Successfully started execution for state machine ARN: {}", stateMachineArn); + logger.debug("Execution ARN: {}", startExecutionResult.getExecutionArn()); + + } catch (Exception e) { + logger.error("Unexpected error occurred during Step Function execution: {}", e.getMessage(), e); + throw e; + } + } +} \ No newline at end of file diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java index 90a0ea837d..0ac9a0ed89 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java @@ -4,7 +4,10 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.job.api.JobType; -import com.codahale.metrics.MetricRegistry; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService; +import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService; +import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.testng.annotations.Test; import java.time.Clock; @@ -38,7 +41,7 @@ public void testSizeCache() { BaseEventStore mockEventStore = mock(BaseEventStore.class); AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class), - mock(JobHandlerRegistry.class), mock(JobType.class), clock, new MetricRegistry()){}; + mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){}; // At limit=500, size estimate should be at 4800 // At limit=50, size estimate should be at 5000 diff --git a/sdk/pom.xml b/sdk/pom.xml index e8f8b7eb5b..e264ab8e53 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/sor-api/pom.xml b/sor-api/pom.xml index d7bc799c60..058e8c28ba 100644 --- a/sor-api/pom.xml +++ b/sor-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml index 8028d79623..f345505763 100644 --- a/sor-client-common/pom.xml +++ b/sor-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml index 4141cd9d99..dafa84a916 100644 --- a/sor-client-jersey2/pom.xml +++ b/sor-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/sor-client/pom.xml b/sor-client/pom.xml index 100d90210e..1496ad9d31 100644 --- a/sor-client/pom.xml +++ b/sor-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/sor/pom.xml b/sor/pom.xml index d2a61758ef..7112bfafe4 100644 --- a/sor/pom.xml +++ b/sor/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/table/pom.xml b/table/pom.xml index e4592dd6b0..7d962487cf 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/uac-api/pom.xml b/uac-api/pom.xml index b9ba2a6008..7b15c45c05 100644 --- a/uac-api/pom.xml +++ b/uac-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml index 38984d2abd..a422db8bd9 100644 --- a/uac-client-jersey2/pom.xml +++ b/uac-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/uac-client/pom.xml b/uac-client/pom.xml index 55aae7fdb8..82bfb686b4 100644 --- a/uac-client/pom.xml +++ b/uac-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/web-local/pom.xml b/web-local/pom.xml index 4535e14494..6692d548cc 100644 --- a/web-local/pom.xml +++ b/web-local/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/web/pom.xml b/web/pom.xml index eadb8ccbf7..a8a2619eae 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java index 00c380af6a..b86b38943d 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java @@ -381,8 +381,6 @@ public Collection getTablePlacements(@Authenticated Subject subject) { _getTablePlacementsRequestsByApiKey.getUnchecked(subject.getId()).mark(); return _blobStore.getTablePlacements(); } - - /** * Retrieves the current version of a piece of content from the data store. */ @@ -600,4 +598,4 @@ private Supplier onceOnlySupplier(final InputStream in) { return in; }; } -} +} \ No newline at end of file diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java index c6dcd408fc..88e0a04dbc 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java @@ -110,6 +110,20 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection messages) { + // Not partitioned--any server can write messages to Cassandra. + _queueService.sendAll(queue, messages,true); + return SuccessResponse.instance(); + } @POST @Path("_sendbatch") @@ -154,6 +168,18 @@ public long getMessageCount(@QueryParam("partitioned") BooleanParam partitioned, } } + @GET + @Path("{queue}/uncached_size") + @RequiresPermissions("queue|get_status|{queue}") + @Timed(name = "bv.emodb.dedupq.DedupQueueResource1.getUncachedMessageCount", absolute = true) + @ApiOperation (value = "gets the uncached Message count.", + notes = "Returns a long.", + response = long.class + ) + public long getUncachedMessageCount(@PathParam("queue") String queue) { + return _queueService.getUncachedSize(queue); + } + @GET @Path("{queue}/claimcount") diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java index ff6334db05..e0a312c623 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java @@ -35,6 +35,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -120,6 +121,24 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection events) { + //TODO change query param name / type + // Not partitioned--any server can write messages to Cassandra. + _queueService.sendAll(queue, events, true); + return SuccessResponse.instance(); + } + @POST @Path("_sendbatch") @Consumes(MediaType.APPLICATION_JSON) @@ -162,6 +181,17 @@ public long getMessageCount(@PathParam("queue") String queue, @QueryParam("limit } } + @GET + @Path("{queue}/uncached_size") + @RequiresPermissions("queue|get_status|{queue}") + @Timed(name = "bv.emodb.queue.QueueResource1.getUncachedMessageCount", absolute = true) + @ApiOperation (value = "gets the uncached Message count.", + notes = "Returns a long.", + response = long.class + ) + public long getUncachedMessageCount(@PathParam("queue") String queue) { + return _queueService.getUncachedSize(queue); + } @GET @Path("{queue}/claimcount") diff --git a/yum/pom.xml b/yum/pom.xml index adee7cf547..6ae9e54f04 100644 --- a/yum/pom.xml +++ b/yum/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.171-SNAPSHOT + 6.5.204-SNAPSHOT ../parent/pom.xml