Skip to content
This repository has been archived by the owner on Sep 22, 2020. It is now read-only.

Commit

Permalink
sqs integration
Browse files Browse the repository at this point in the history
  • Loading branch information
snlamm committed Dec 28, 2017
1 parent cdcbf06 commit 9b711c8
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 12 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"devDependencies": {
"amqplib": "^0.5.1",
"ava": "^0.18.1",
"aws-sdk": "^2.172.0",
"babel-cli": ">=6.7 <=7.0",
"babel-eslint": "^6.1.1",
"babel-preset-grind": "^0.7.0",
Expand Down
4 changes: 2 additions & 2 deletions src/Commands/QueueWorkCommand.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class QueueWorkCommand extends Command {

options = [
new InputOption('queue', InputOption.VALUE_OPTIONAL, 'Specify the queue(s) to perform work for.'),
new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrency.', '1'),
new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrently.', '1'),
new InputOption('watch', InputOption.VALUE_OPTIONAL, 'Folders to watch for changes')
]

Expand All @@ -36,7 +36,7 @@ export class QueueWorkCommand extends Command {

async run() {
let queues = null
const connection = this.app.queue.get(this.argument('connection'))
const connection = await this.app.queue.get(this.argument('connection'))

if(this.containsOption('queue')) {
queues = this.option('queue').split(/,/).map(job => job.trim()).filter(job => job.length > 0)
Expand Down
10 changes: 10 additions & 0 deletions src/Drivers/BaseDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Base class all drivers must extend
*/
export class BaseDriver {

app = null
state = null
retryDelay = 90000
Expand All @@ -16,6 +17,15 @@ export class BaseDriver {
}
}

/**
* Performs setup operations when starting the driver
*
* @return Promise
*/
ready() {
return Promise.resolve()
}

/**
* Connects to the backend engine
*
Expand Down
105 changes: 105 additions & 0 deletions src/Drivers/SQSDriver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import './BaseDriver'
import '../Support/SQS'

/**
* AWS Simple Queue Service (SQS) backed Queue Driver
*/
export class SQSDriver extends BaseDriver {
client = null

constructor(app, config) {
super(app, config)

this.client = new SQS(config)
}

ready() {
return this.client.createQueue()
}

connect() {
return Promise.resolve()
}

async dispatch(job) {
const payload = this.buildPayload(job)
const queueUrl = this.client.queueUrls[payload.queue]

if(queueUrl.isNil) {
return Promise.reject(`Unable to find SQS url for job queue ${payload.queue}`)
}

const params = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(payload),
DelaySeconds: payload.delay || 0
}

return this.client.put(params)
}

listen(queues, concurrency, jobHandler, errorHandler) {
// SQS can batch ingest up to 10 jobs in a single call
const concurrentListens = Math.ceil(concurrency / 10)
const remainderJobConcurrency = 10 - ((concurrentListens * 10) - concurrency)

const listeners = [ ]

for(let i = 0; i < concurrentListens; i++) {
for(const queue of queues) {
let concurrentJobs = 10

if(i === (concurrentListens - 1)) {
concurrentJobs = remainderJobConcurrency
}

listeners.push(this._listen(queue, concurrentJobs, jobHandler, errorHandler))
}
}

return Promise.all(listeners)
}

_listen(queue, concurrency, jobHandler, errorHandler) {
return this.client.watch(queue, concurrency, async function callback(jobData, dispatchedAt, pastTries = 1) {
const job = JSON.parse(jobData)
const isExpired = timeout => {
return (timeout !== 0) && ((new Date - dispatchedAt) > timeout)
}

try {
if(isExpired(job.timeout)) {
return
}

await jobHandler(job)
} catch(err) {
try {
const tries = Number.parseInt(job.tries) || 1

if((pastTries >= tries) || isExpired(job.timeout)) {
throw err
}

if(job.retry_delay > 0) {
await new Promise(resolve => {
return setTimeout(() => {
return resolve()
}, job.retry_delay)
})
}
} catch(err) {
return errorHandler(job, err)
}

return callback(jobData, dispatchedAt, pastTries += 1)
}
})
}

destroy() {
this.client.constructor.queueUrls = { }
return super.destroy()
}

}
25 changes: 16 additions & 9 deletions src/QueueFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import './Queue'
import './Drivers/BaseDriver'
import './Drivers/BeanstalkDriver'
import './Drivers/RabbitDriver'
import './Drivers/SQSDriver'

export class QueueFactory {
app = null
Expand All @@ -14,22 +15,25 @@ export class QueueFactory {
beanstalkd: BeanstalkDriver,
rabbit: RabbitDriver,
rabbitmq: RabbitDriver,
amqp: RabbitDriver
amqp: RabbitDriver,
sqs: SQSDriver
}

constructor(app) {
this.app = app
}

dispatch(job, connection = null) {
return this.get(connection).dispatch(job)
async dispatch(job, connection = null) {
connection = await this.get(connection)
return connection.dispatch(job)
}

status(job, connection = null) {
return this.get(connection).status(job)
async status(job, connection = null) {
connection = await this.get(connection)
return connection.status(job)
}

get(connection) {
async get(connection) {
let name = null

if(connection.isNil) {
Expand Down Expand Up @@ -58,7 +62,7 @@ export class QueueFactory {
throw new Error(`Unsupported queue driver: ${config.driver}`)
}

connection = this.make(driverClass, config)
connection = await this.make(driverClass, config)

if(!name.isNil) {
this.connections[name] = connection
Expand All @@ -67,8 +71,11 @@ export class QueueFactory {
return connection
}

make(driverClass, config) {
return new Queue(this.app, this, new driverClass(this.app, config))
async make(driverClass, config) {
const driver = new driverClass(this.app, config)
await driver.ready()

return new Queue(this.app, this, driver)
}

registerDriver(name, driverClass) {
Expand Down
7 changes: 6 additions & 1 deletion src/QueueProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import './QueueFactory'
import './Commands/MakeJobCommand'
import './Commands/QueueWorkCommand'

export function QueueProvider(app, classes = { }) {
export async function QueueProvider(app, classes = { }) {
const factoryClass = classes.factoryClass || QueueFactory
app.queue = new factoryClass(app)

for(const connectionName of Object.keys(app.config.get('queue.connections'))) {
// Trigger initial setup of backend engines
await app.queue.get(connectionName)
}

if(app.cli.isNil) {
return
}
Expand Down
Loading

0 comments on commit 9b711c8

Please sign in to comment.