diff --git a/package.json b/package.json index 8c39cbf..4a27046 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "devDependencies": { "amqplib": "^0.5.1", "ava": "^0.18.1", + "aws-sdk": "^2.172.0", "babel-cli": ">=6.7 <=7.0", "babel-eslint": "^6.1.1", "babel-preset-grind": "^0.7.0", diff --git a/src/Commands/QueueWorkCommand.js b/src/Commands/QueueWorkCommand.js index 8bc7ed9..c84f53f 100644 --- a/src/Commands/QueueWorkCommand.js +++ b/src/Commands/QueueWorkCommand.js @@ -11,7 +11,7 @@ export class QueueWorkCommand extends Command { options = [ new InputOption('queue', InputOption.VALUE_OPTIONAL, 'Specify the queue(s) to perform work for.'), - new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrency.', '1'), + new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrently.', '1'), new InputOption('watch', InputOption.VALUE_OPTIONAL, 'Folders to watch for changes') ] @@ -36,7 +36,7 @@ export class QueueWorkCommand extends Command { async run() { let queues = null - const connection = this.app.queue.get(this.argument('connection')) + const connection = await this.app.queue.get(this.argument('connection')) if(this.containsOption('queue')) { queues = this.option('queue').split(/,/).map(job => job.trim()).filter(job => job.length > 0) diff --git a/src/Drivers/BaseDriver.js b/src/Drivers/BaseDriver.js index 8659dcd..506f31b 100644 --- a/src/Drivers/BaseDriver.js +++ b/src/Drivers/BaseDriver.js @@ -2,6 +2,7 @@ * Base class all drivers must extend */ export class BaseDriver { + app = null state = null retryDelay = 90000 @@ -16,6 +17,15 @@ export class BaseDriver { } } + /** + * Performs setup operations when starting the driver + * + * @return Promise + */ + ready() { + return Promise.resolve() + } + /** * Connects to the backend engine * diff --git a/src/Drivers/SQSDriver.js b/src/Drivers/SQSDriver.js new file mode 100644 index 0000000..9777801 --- /dev/null +++ b/src/Drivers/SQSDriver.js @@ -0,0 +1,105 @@ +import './BaseDriver' +import '../Support/SQS' + +/** + * AWS Simple Queue Service (SQS) backed Queue Driver + */ +export class SQSDriver extends BaseDriver { + client = null + + constructor(app, config) { + super(app, config) + + this.client = new SQS(config) + } + + ready() { + return this.client.createQueue() + } + + connect() { + return Promise.resolve() + } + + async dispatch(job) { + const payload = this.buildPayload(job) + const queueUrl = this.client.queueUrls[payload.queue] + + if(queueUrl.isNil) { + return Promise.reject(`Unable to find SQS url for job queue ${payload.queue}`) + } + + const params = { + QueueUrl: queueUrl, + MessageBody: JSON.stringify(payload), + DelaySeconds: payload.delay || 0 + } + + return this.client.put(params) + } + + listen(queues, concurrency, jobHandler, errorHandler) { + // SQS can batch ingest up to 10 jobs in a single call + const concurrentListens = Math.ceil(concurrency / 10) + const remainderJobConcurrency = 10 - ((concurrentListens * 10) - concurrency) + + const listeners = [ ] + + for(let i = 0; i < concurrentListens; i++) { + for(const queue of queues) { + let concurrentJobs = 10 + + if(i === (concurrentListens - 1)) { + concurrentJobs = remainderJobConcurrency + } + + listeners.push(this._listen(queue, concurrentJobs, jobHandler, errorHandler)) + } + } + + return Promise.all(listeners) + } + + _listen(queue, concurrency, jobHandler, errorHandler) { + return this.client.watch(queue, concurrency, async function callback(jobData, dispatchedAt, pastTries = 1) { + const job = JSON.parse(jobData) + const isExpired = timeout => { + return (timeout !== 0) && ((new Date - dispatchedAt) > timeout) + } + + try { + if(isExpired(job.timeout)) { + return + } + + await jobHandler(job) + } catch(err) { + try { + const tries = Number.parseInt(job.tries) || 1 + + if((pastTries >= tries) || isExpired(job.timeout)) { + throw err + } + + if(job.retry_delay > 0) { + await new Promise(resolve => { + return setTimeout(() => { + return resolve() + }, job.retry_delay) + }) + } + } catch(err) { + return errorHandler(job, err) + } + + return callback(jobData, dispatchedAt, pastTries += 1) + } + }) + } + + destroy() { + this.client.constructor.queueUrls = { } + return super.destroy() + } + +} diff --git a/src/QueueFactory.js b/src/QueueFactory.js index 214c1c8..3d9473c 100644 --- a/src/QueueFactory.js +++ b/src/QueueFactory.js @@ -4,6 +4,7 @@ import './Queue' import './Drivers/BaseDriver' import './Drivers/BeanstalkDriver' import './Drivers/RabbitDriver' +import './Drivers/SQSDriver' export class QueueFactory { app = null @@ -14,22 +15,25 @@ export class QueueFactory { beanstalkd: BeanstalkDriver, rabbit: RabbitDriver, rabbitmq: RabbitDriver, - amqp: RabbitDriver + amqp: RabbitDriver, + sqs: SQSDriver } constructor(app) { this.app = app } - dispatch(job, connection = null) { - return this.get(connection).dispatch(job) + async dispatch(job, connection = null) { + connection = await this.get(connection) + return connection.dispatch(job) } - status(job, connection = null) { - return this.get(connection).status(job) + async status(job, connection = null) { + connection = await this.get(connection) + return connection.status(job) } - get(connection) { + async get(connection) { let name = null if(connection.isNil) { @@ -58,7 +62,7 @@ export class QueueFactory { throw new Error(`Unsupported queue driver: ${config.driver}`) } - connection = this.make(driverClass, config) + connection = await this.make(driverClass, config) if(!name.isNil) { this.connections[name] = connection @@ -67,8 +71,11 @@ export class QueueFactory { return connection } - make(driverClass, config) { - return new Queue(this.app, this, new driverClass(this.app, config)) + async make(driverClass, config) { + const driver = new driverClass(this.app, config) + await driver.ready() + + return new Queue(this.app, this, driver) } registerDriver(name, driverClass) { diff --git a/src/QueueProvider.js b/src/QueueProvider.js index 41c6b8b..bdbda86 100644 --- a/src/QueueProvider.js +++ b/src/QueueProvider.js @@ -3,10 +3,15 @@ import './QueueFactory' import './Commands/MakeJobCommand' import './Commands/QueueWorkCommand' -export function QueueProvider(app, classes = { }) { +export async function QueueProvider(app, classes = { }) { const factoryClass = classes.factoryClass || QueueFactory app.queue = new factoryClass(app) + for(const connectionName of Object.keys(app.config.get('queue.connections'))) { + // Trigger initial setup of backend engines + await app.queue.get(connectionName) + } + if(app.cli.isNil) { return } diff --git a/src/Support/SQS.js b/src/Support/SQS.js new file mode 100644 index 0000000..dc50178 --- /dev/null +++ b/src/Support/SQS.js @@ -0,0 +1,244 @@ +import { MissingPackageError } from 'grind-framework' + +let sqs = null + +/** + * Loads the aws-sdk package or throws an error + * if it hasn‘t been added + */ +function loadPackage() { + if(!sqs.isNil) { + return + } + + try { + sqs = require('aws-sdk').SQS + } catch(err) { + throw new MissingPackageError('aws-sdk') + } +} + +/** + * Wrapper around AWS SQS to provide a + * promise based interface + simplifies a few ops + */ +export class SQS { + + static queueUrls = { } + + get queueUrls() { + return this.constructor.queueUrls + } + + client = null + queueConfigs = null + + constructor(config) { + loadPackage() + this.queueConfigs = config.queues + + const serviceConfig = { + region: config.region || 'us-east-1' + } + + // NOTE preferred way to access SQS is via an AWS IAM role + if(!config.access_key.isNil && !config.secret_key.isNil) { + config.accessKeyId = config.access_key + config.secretAccessKey = config.secret_key + } + + this.client = new sqs(serviceConfig) + } + + /* + To create queues, use the `queues` object via the config: + "connections": { + "sqs-connection": { + "driver": "sqs", + "queues": { + "test-queue-name": { ...queue options here }, + "test-queue-name2": { "QueueUrl": "https://sqs.us-east-1..." } + } + } + } + + On app start, grind-queue calls AWS to create/update your queues as necessary and grab the URLs. + For any queue options not specified, we'll use the AWS defaults. + The queue name `default` is reserved for setting queue defaults different from AWS's. + + If you want to use an SQS queue managed without grind-queue, set the QueueUrl option. + Grind-queue will use that url and, on startup, will skip the inital calls to AWS. + */ + async createQueue() { + for(const [ name, queueAttributes ] of Object.entries(this.queueConfigs)) { + // `default` queue name is reserved for setting new defaults + if(name === 'default') { + continue + } + + // If QueueUrl option is set, simply use the url + if(!queueAttributes.QueueUrl.isNil) { + this.queueUrls[name] = queueAttributes.QueueUrl + continue + } + + const defaultAttributes = Object.assign({ + // AWS defaults - set explicitly here to ovveride changes made via SQS console/cli + DelaySeconds: '0', + MaximumMessageSize: '262144', + MessageRetentionPeriod: '345600', + ReceiveMessageWaitTimeSeconds: '0', + VisibilityTimeout: '30' + }, (this.queueConfigs.default || { })) + + const params = { + QueueName: name, + Attributes: { + ...Object.assign(defaultAttributes, queueAttributes) + } + } + + await new Promise((res, rej) => this.findOrCreateQueue(res, rej, name, params)) + } + } + + async findOrCreateQueue(resolve, reject, name, params) { + let queueUrl = params.QueueUrl + + if(!queueUrl.isNil) { + // set remote url for queue + this.queueUrls[name] = queueUrl + return resolve() + } + + try { + queueUrl = await new Promise((resolve2, reject2) => { + return this.client.createQueue(params, async (err, data) => { + if(!err.isNil && (err.code === 'QueueAlreadyExists')) { + // error triggered when remote queue exists but config and remote attributes differ + await this.findAndUpdateQueue(params) + return resolve2(params.QueueUrl) + } else if(!err.isNil) { + return reject2(err) + } + + // resolves here if queue is created -or- queue exists and config and remote attributes match + return resolve2(data.QueueUrl) + }) + }) + } catch(err) { + return reject(err) + } + + // set remote url for queue + this.queueUrls[name] = queueUrl + return resolve() + } + + async findAndUpdateQueue(params) { + // Set queue url then update queue attributes + params.QueueUrl = await new Promise((resolve, reject) => { + return this.client.getQueueUrl({ QueueName: params.QueueName }, (err, data) => { + return err.isNil ? resolve(data.QueueUrl) : reject(err) + }) + }) + + return new Promise((resolve, reject) => { + delete params.QueueName + + return this.client.setQueueAttributes(params, err /* ,data */ => { + return err.isNil ? resolve() : reject(err) + }) + }) + } + + put(params) { + return new Promise((resolve, reject) => { + return this.client.sendMessage(params, (err, data) => { + return err.isNil ? resolve(data) : reject(err) + }) + }) + } + + async watch(queue, concurrency, handler) { + const queueUrl = this.queueUrls[queue] + + if(queueUrl.isNil) { + throw new Error(`Queue url not found ${queue}`) + } + + const params = { + QueueUrl: queueUrl, + MaxNumberOfMessages: concurrency, + // VisibilityTimeout: 30 - set via queue-wide VisibilityTimeout + WaitTimeSeconds: 20, + AttributeNames: [ 'SentTimestamp' ] + } + + const messages = await new Promise(resolve => { + return this.client.receiveMessage(params, (err, data) => { + if(err) { + return resolve() + } else if((data.Messages === void 0) || (data.Messages.length === 0)) { + return resolve() + } + + return resolve(data.Messages) + }) + }) + + if(Array.isArray(messages)) { + // Delete jobs from SQS so they are not reprocessed, then process them + await this.deleteFromQueue(queueUrl, messages) + await Promise.all(messages.filter(message => message.isDeleted).map(message => { + const messageData = message.Body + const dispatchedAt = new Date(message.Attributes.SentTimestamp) + + return handler(messageData, dispatchedAt) + })) + } + + // Once finished processing current jobs, re-watch the queue + return this.watch(queue, concurrency, handler) + } + + async deleteFromQueue(url, messages) { + const params = { + QueueUrl: url, + Entries: messages.map((message, i) => { + return { + Id: `${i}`, + ReceiptHandle: message.ReceiptHandle + } + }) + } + + try { + await new Promise((resolve, reject) => { + return this.client.deleteMessageBatch(params, (err, data) => { + if(!err.isNil) { + return reject(err) + } + + for(const result of data.Successful) { + messages[result.Id].isDeleted = true + } + + if(data.Failed.length > 0) { + const failed = data.Failed + const mssg = failed.map(result => `{senderFault: ${result.SenderFault}, code: ${result.Code}}`) + + Log.error( + `Failed to delete ${failed.length} messasges. Releasing back into queue. Error ${mssg}` + ) + } + + return resolve() + }) + }) + } catch(err) { + Log.error(`Failed to delete messasges. Releasing back into queue. Error ${err}`) + } + } + +}