Skip to content
This repository has been archived by the owner on Sep 22, 2020. It is now read-only.

SQS Integration #5

Open
wants to merge 3 commits into
base: abstract-queue
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"beanstalk",
"ampq",
"queue",
"faktory"
"faktory",
"sqs"
],
"peerDependencies": {
"grind-cli": "^0.7.0",
Expand All @@ -25,6 +26,7 @@
"devDependencies": {
"amqplib": "^0.5.1",
"ava": "^0.18.1",
"aws-sdk": "^2.172.0",
"babel-cli": ">=6.7 <=7.0",
"babel-eslint": "^6.1.1",
"babel-preset-grind": "^0.8.0-beta.1",
Expand Down
4 changes: 2 additions & 2 deletions src/Commands/QueueWorkCommand.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class QueueWorkCommand extends Command {

options = [
new InputOption('queue', InputOption.VALUE_OPTIONAL, 'Specify the queue(s) to perform work for.'),
new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrency.', '1'),
new InputOption('concurrency', InputOption.VALUE_OPTIONAL, 'Number of jobs to process concurrently.', '1'),
new InputOption('watch', InputOption.VALUE_OPTIONAL, 'Folders to watch for changes')
]

Expand All @@ -36,7 +36,7 @@ export class QueueWorkCommand extends Command {

async run() {
let queues = null
const connection = this.app.queue.get(this.argument('connection'))
const connection = await this.app.queue.get(this.argument('connection'))

if(this.containsOption('queue')) {
queues = this.option('queue').split(/,/).map(job => job.trim()).filter(job => job.length > 0)
Expand Down
10 changes: 10 additions & 0 deletions src/Drivers/BaseDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Base class all drivers must extend
*/
export class BaseDriver {

app = null
state = null
retryDelay = 90000
Expand All @@ -16,6 +17,15 @@ export class BaseDriver {
}
}

/**
* Performs setup operations when starting the driver
*
* @return Promise
*/
ready() {
return Promise.resolve()
}

/**
* Connects to the backend engine
*
Expand Down
109 changes: 109 additions & 0 deletions src/Drivers/SQSDriver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import './BaseDriver'
import '../Support/SQS'

/**
* AWS Simple Queue Service (SQS) backed Queue Driver
*/
export class SQSDriver extends BaseDriver {
client = null

constructor(app, config) {
super(app, config)

this.client = new SQS(config)
}

ready() {
return this.client.createQueue()
}

connect() {
return Promise.resolve()
}

async dispatch(job) {
const payload = this.buildPayload(job)
const queueUrl = this.client.queueUrls[payload.queue]

if(queueUrl.isNil) {
return Promise.reject(`Unable to find SQS url for job queue ${payload.queue}`)
}

const params = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(payload)
}

if(payload.delay > 0) {
params.DelaySeconds = Math.round(payload.delay / 1000)
}

return this.client.put(params)
}

listen(queues, concurrency, jobHandler, errorHandler) {
// SQS can batch ingest up to 10 jobs in a single call
const concurrentListens = Math.ceil(concurrency / 10)
const remainderJobConcurrency = 10 - ((concurrentListens * 10) - concurrency)

const listeners = [ ]

for(let i = 0; i < concurrentListens; i++) {
for(const queue of queues) {
let concurrentJobs = 10

if(i === (concurrentListens - 1)) {
concurrentJobs = remainderJobConcurrency
}

listeners.push(this._listen(queue, concurrentJobs, jobHandler, errorHandler))
}
}

return Promise.all(listeners)
}

_listen(queue, concurrency, jobHandler, errorHandler) {
return this.client.watch(queue, concurrency, async function callback(jobData, dispatchedAt, pastTries = 1) {
const job = JSON.parse(jobData)
const isExpired = timeout => {
return (timeout !== 0) && ((new Date - dispatchedAt) > timeout)
}

try {
if(isExpired(job.timeout)) {
return
}

await jobHandler(job)
} catch(err) {
try {
const tries = Number.parseInt(job.tries) || 1

if((pastTries >= tries) || isExpired(job.timeout)) {
throw err
}

if(job.retry_delay > 0) {
await new Promise(resolve => {
return setTimeout(() => {
return resolve()
}, job.retry_delay)
})
}
} catch(err) {
return errorHandler(job, err)
}

return callback(jobData, dispatchedAt, pastTries += 1)
}
})
}

destroy() {
this.client.isShutdown = true
this.client.constructor.queueUrls = { }
return super.destroy()
}

}
25 changes: 16 additions & 9 deletions src/QueueFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import './Drivers/BaseDriver'
import './Drivers/BeanstalkDriver'
import './Drivers/FaktoryDriver'
import './Drivers/RabbitDriver'
import './Drivers/SQSDriver'

export class QueueFactory {
app = null
Expand All @@ -16,22 +17,25 @@ export class QueueFactory {
faktory: FaktoryDriver,
rabbit: RabbitDriver,
rabbitmq: RabbitDriver,
amqp: RabbitDriver
amqp: RabbitDriver,
sqs: SQSDriver
}

constructor(app) {
this.app = app
}

dispatch(job, connection = null) {
return this.get(connection).dispatch(job)
async dispatch(job, connection = null) {
connection = await this.get(connection)
return connection.dispatch(job)
}

status(job, connection = null) {
return this.get(connection).status(job)
async status(job, connection = null) {
connection = await this.get(connection)
return connection.status(job)
}

get(connection) {
async get(connection) {
let name = null

if(connection.isNil) {
Expand Down Expand Up @@ -60,7 +64,7 @@ export class QueueFactory {
throw new Error(`Unsupported queue driver: ${config.driver}`)
}

connection = this.make(driverClass, config)
connection = await this.make(driverClass, config)

if(!name.isNil) {
this.connections[name] = connection
Expand All @@ -69,8 +73,11 @@ export class QueueFactory {
return connection
}

make(driverClass, config) {
return new Queue(this.app, this, new driverClass(this.app, config))
async make(driverClass, config) {
const driver = new driverClass(this.app, config)
await driver.ready()

return new Queue(this.app, this, driver)
}

registerDriver(name, driverClass) {
Expand Down
7 changes: 6 additions & 1 deletion src/QueueProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import './QueueFactory'
import './Commands/MakeJobCommand'
import './Commands/QueueWorkCommand'

export function QueueProvider(app, classes = { }) {
export async function QueueProvider(app, classes = { }) {
const factoryClass = classes.factoryClass || QueueFactory
app.queue = new factoryClass(app)

for(const connectionName of Object.keys(app.config.get('queue.connections'))) {
// Trigger initial setup of backend engines
await app.queue.get(connectionName)
}

if(app.cli.isNil) {
return
}
Expand Down
Loading