Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented integration for AWS SQS service. #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MoquiConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<tools>
<tool-factory class="org.moqui.aws.S3ClientToolFactory" init-priority="40" disabled="false"/>
<tool-factory class="org.moqui.aws.SnsClientToolFactory" init-priority="40" disabled="false"/>
<tool-factory class="org.moqui.aws.SqsClientToolFactory" init-priority="40" disabled="false"/>
</tools>
<resource-facade>
<resource-reference scheme="aws3" class="org.moqui.aws.S3ResourceReference"/>
Expand Down
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ dependencies {
implementation project(':framework')

// AWS Java SDK for supported services (depends on core and kms jars)
implementation group: 'software.amazon.awssdk', name: 's3', version: '2.17.4' // Apache 2.0
implementation group: 'software.amazon.awssdk', name: 'sns', version: '2.17.4' // Apache 2.0
implementation group: 'software.amazon.awssdk', name: 's3', version: '2.23.4' // Apache 2.0
implementation group: 'software.amazon.awssdk', name: 'sns', version: '2.23.4' // Apache 2.0
implementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.23.4' // Apache 2.0

// explicit dependencies to get newer versions
implementation 'io.netty:netty-codec-http:4.1.66.Final'
Expand Down
279 changes: 279 additions & 0 deletions service/moqui/aws/SqsServices.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
This software is in the public domain under CC0 1.0 Universal plus a
Grant of Patent License.

To the extent possible under law, the author(s) have dedicated all
copyright and related and neighboring rights to this software to the
public domain worldwide. This software is distributed without any
warranty.

You should have received a copy of the CC0 Public Domain Dedication
along with this software (see the LICENSE.md file). If not, see
<http://creativecommons.org/publicdomain/zero/1.0/>.
-->

<services xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://moqui.org/xsd/service-definition-3.xsd">

<service verb="create" noun="Queue" type="interface">
<in-parameters>
<parameter name="queueName" required="true"/>
<parameter name="visibilityTimeout" type="Integer"/><!-- Unit - seconds, should be between 0 seconds and 12 hours. -->
<parameter name="messageRetentionPeriod" type="Integer"/><!-- Unit - seconds, should be between 60 seconds (1 minute) and 1,209,600 seconds (14 days). -->
<parameter name="deliveryDelay" type="Integer"/><!-- Unit - seconds, should be between 0 seconds and 15 minutes. -->
<parameter name="receiveMessageWaitTime" type="Integer"/><!-- Unit - seconds, should be between 0 and 20 seconds. -->
<parameter name="maximumMessageSize" type="Integer"/><!-- Should be between 1 KB and 256 KB. -->
</in-parameters>
<out-parameters>
<parameter name="queueUrl"/>
</out-parameters>
</service>
<service verb="receive" noun="Messages" type="interface">
<in-parameters>
<parameter name="queueUrl" required="true"/>
<parameter name="maxNumberOfMessages" type="Integer" default-value="1"/><!-- defaults to 1 - ReceiveMessage API default, allowed up to 10 -->
<parameter name="waitTimeSeconds" type="Integer" default-value="0"/><!-- defaults to 0 - short polling, allowed between 1 second and 20 seconds for long polling -->
<parameter name="visibilityTimeoutSeconds" type="Integer"/>
</in-parameters>
<out-parameters>
<parameter name="messageList" type="List">
<parameter name="message"/>
<parameter name="receiptHandle"/>
</parameter>
</out-parameters>
</service>

<service verb="create" noun="StandardQueue">
<description>Create Amazon SQS Standard Queue</description>
<implements service="moqui.aws.SqsServices.create#Queue"/>
<actions>
<script>
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import software.amazon.awssdk.services.sqs.model.SqsException

sqsClientFactory = ec.factory.getToolFactory('AwsSqsClient')
if (!sqsClientFactory) {
ec.message.addError("AWS SQS tool not active.")
return
}

queueAttributes = [:]
if (visibilityTimeout) queueAttributes.add(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout)
if (messageRetentionPeriod) queueAttributes.add(QueueAttributeName.MESSAGE_RETENTION_PERIOD, messageRetentionPeriod)
if (deliveryDelay) queueAttributes.add(QueueAttributeName.DELAY_SECONDS, deliveryDelay)
if (receiveMessageWaitTime) queueAttributes.add(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTime)
if (maximumMessageSize) queueAttributes.add(QueueAttributeName.MAXIMUM_MESSAGE_SIZE, maximumMessageSize)

try {
sqsClient = ec.getTool('AwsSqsClient', SqsClient.class)
createQueueRequestBuilder = CreateQueueRequest.builder().queueName(queueName)
if (queueAttributes) createQueueRequestBuilder.attributes(queueAttributes)
createQueueRequest = createQueueRequestBuilder.build();
createQueueResponse = sqsClient.createQueue(createQueueRequest)
} catch (Exception e) {
ec.logger.error("Error creating SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}

queueUrl = createQueueResponse.queueUrl()
</script>

<log message="Created SQS queue [${queueName}] at [${queueUrl}]"/>
</actions>
</service>
<service verb="delete" noun="Queue">
<in-parameters>
<parameter name="queueUrl" required="true"/>
</in-parameters>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="deleteQueueRequest" from="software.amazon.awssdk.services.sqs.model.DeleteQueueRequest.builder().queueUrl(queueUrl).build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
sqsClient.deleteQueue(deleteQueueRequest)
} catch (Exception e) {
ec.logger.error("Error creating SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>

<log message="Deleted SQS queue [${queueUrl}]"/>
</actions>
</service>
<service verb="get" noun="QueueUrl">
<in-parameters>
<parameter name="queueName" required="true"/>
</in-parameters>
<out-parameters>
<parameter name="queueUrl"/>
</out-parameters>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="getQueueUrlRequest" from="software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest.builder().queueName(queueName).build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
queueUrl = sqsClient.getQueueUrl(getQueueUrlRequest).queueUrl();
} catch (Exception e) {
ec.logger.error("Error getting queueUrl for SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>
</actions>
</service>
<service verb="list" noun="Queues">
<in-parameters>
<parameter name="queueNamePrefix"/>
</in-parameters>
<out-parameters>
<parameter name="queueUrls" type="List"/>
</out-parameters>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="requestBuilder" from="software.amazon.awssdk.services.sqs.model.ListQueuesRequest.builder()"/>
<if condition="queueNamePrefix">
<set field="requestBuilder" from="requestBuilder.queueNamePrefix(queueNamePrefix)"/>
</if>
<set field="listQueuesRequest" from="requestBuilder.build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
queueUrls = sqsClient.listQueues(listQueuesRequest).queueUrls()
} catch (Exception e) {
ec.logger.error("Error retrieving queues from AWS SQS service", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>
</actions>
</service>
<service verb="send" noun="Message">
<in-parameters>
<parameter name="queueUrl" required="true"/>
<parameter name="messageText" required="true"/>
</in-parameters>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="sendMessageRequest" from="software.amazon.awssdk.services.sqs.model.SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageText).build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
sqsClient.sendMessage(sendMessageRequest)
} catch (Exception e) {
ec.logger.error("Error sending message to SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>

<log message="Sent message [${messageText}] to sqs queue [${queueUrl}]"/>
</actions>
</service>
<service verb="receive" noun="QueuedMessages">
<implements service="moqui.aws.SqsServices.receive#Messages"/>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="requestBuilder" from="software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(maxNumberOfMessages).waitTimeSeconds(waitTimeSeconds)"/>
<if condition="visibilityTimeoutSeconds">
<set field="requestBuilder" from="requestBuilder.visibilityTimeout(visibilityTimeoutSeconds)"/>
</if>
<set field="receiveMessageRequest" from="requestBuilder.build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
messages = sqsClient.receiveMessage(receiveMessageRequest).messages()
} catch (Exception e) {
ec.logger.error("Error receiving messages from SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>

<set field="messageList" from="[]"/>
<iterate list="messages" entry="message">
<set field="messageMap" from="[:]"/>
<set field="messageMap.message" from="message.body()"/>
<set field="messageMap.receiptHandle" from="message.receiptHandle()"/>
<script>messageList.add(messageMap)</script>
</iterate>
</actions>
</service>
<service verb="delete" noun="Message">
<in-parameters>
<parameter name="queueUrl" required="true"/>
<parameter name="receiptHandle" required="true"/>
</in-parameters>
<actions>
<if condition="!ec.factory.getToolFactory('AwsSqsClient')">
<return error="true" message="AWS SQS tool not active."/>
</if>

<set field="sqsClient" from="ec.getTool('AwsSqsClient', software.amazon.awssdk.services.sqs.SqsClient.class)"/>
<set field="deleteMessageRequest" from="software.amazon.awssdk.services.sqs.model.DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle).build()"/>

<script>
import software.amazon.awssdk.services.sqs.model.SqsException

try {
sqsClient.deleteMessage(deleteMessageRequest);
} catch (Exception e) {
ec.logger.error("Error deleting message [${receiptHandle}] from SQS queue [${queueName}]", e)
ec.message.addError((e instanceof SqsException) ? ((SqsException)e).awsErrorDetails().errorMessage() : e.getMessage())
}
</script>

<log message="Deleted message [${receiptHandle}] from SQS queue [${queueUrl}]"/>
</actions>
</service>
<service verb="poll" noun="Queue">
<implements service="moqui.aws.SqsServices.receive#Messages"/>
<in-parameters>
<parameter name="messageThreshold" type="Integer" default-value="10"/><!-- stop polling when number of messages received exceeds threshold -->
</in-parameters>
<actions>
<set field="messageList" from="[]"/>
<set field="hasMessages" type="Boolean" value="true"/>
<while condition="hasMessages &amp;&amp; messageList.size() &lt; messageThreshold">
<service-call name="moqui.aws.SqsServices.receive#QueuedMessages" in-map="context" out-map="receiveMessageResponse"/>
<log level="info" message="receiveMessageResponse.messageList.size : ${receiveMessageResponse.messageList.size()}"/>
<if condition="!receiveMessageResponse.messageList">
<set field="hasMessages" type="Boolean" value="false"/>
<else>
<script>messageList.addAll(receiveMessageResponse.messageList)</script>
</else>
</if>
</while>
</actions>
</service>
</services>
82 changes: 82 additions & 0 deletions src/main/groovy/org/moqui/aws/SqsClientToolFactory.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* This software is in the public domain under CC0 1.0 Universal plus a
* Grant of Patent License.
*
* To the extent possible under law, the author(s) have dedicated all
* copyright and related and neighboring rights to this software to the
* public domain worldwide. This software is distributed without any
* warranty.
*
* You should have received a copy of the CC0 Public Domain Dedication
* along with this software (see the LICENSE.md file). If not, see
* <http://creativecommons.org/publicdomain/zero/1.0/>.
*/
package org.moqui.aws

import groovy.transform.CompileStatic
import org.moqui.context.ExecutionContextFactory
import org.moqui.context.ToolFactory
import org.moqui.util.SystemBinding
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.SqsClientBuilder


/** A ToolFactory for AWS SQS Client */
@CompileStatic
class SqsClientToolFactory implements ToolFactory<SqsClient>{
protected final static Logger logger = LoggerFactory.getLogger(SqsClientToolFactory.class)
final static String TOOL_NAME = "AwsSqsClient"

protected ExecutionContextFactory ecf = null
protected SqsClient sqsClient = null

/** Default empty constructor */
SqsClientToolFactory(){ }

@Override String getName() { return TOOL_NAME }

@Override SqsClient getInstance(Object... parameters) { return sqsClient }

@Override
void init(ExecutionContextFactory ecf) {
this.ecf = ecf
// NOTE: minimal explicit configuration here, see:
// https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html
// https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html

// There is no Java sys prop key for region, and env var vs Java sys prop keys are different for access key ID and secret
// so normalize here to the standard SDK env var keys and support from Java sys props as well
String awsRegion = SystemBinding.getPropOrEnv("AWS_REGION")
String awsAccessKeyId = SystemBinding.getPropOrEnv("AWS_ACCESS_KEY_ID")
String awsSecret = SystemBinding.getPropOrEnv("AWS_SECRET_ACCESS_KEY")

// Non standard AWS, for example Minio.
String awsEndpointURL = SystemBinding.getPropOrEnv("AWS_ENDPOINT_URL")
if (awsAccessKeyId && awsSecret) {
System.setProperty("aws.accessKeyId", awsAccessKeyId)
System.setProperty("aws.secretAccessKey", awsSecret)
}

logger.info("Starting AWS SQS Client with region ${awsRegion} access ID ${awsAccessKeyId}")

SqsClientBuilder cb = SqsClient.builder()
if (awsRegion) cb.region(Region.of(awsRegion))
if (awsEndpointURL) cb.endpointOverride(new URI(awsEndpointURL))
sqsClient = cb.build()
}

@Override
void destroy() {
if (sqsClient != null) {
try {
sqsClient.close()
logger.info("AWS SQS Client closed")
} catch (Throwable t) {
logger.error("Error in AWS SQS Client close", t)
}
}
}
}