Skip to content
This repository was archived by the owner on Sep 22, 2020. It is now read-only.

Commit fb8eb58

Browse files
committed
Added support for Faktory (https://github.com/contribsys/faktory)
1 parent 584832f commit fb8eb58

File tree

3 files changed

+209
-1
lines changed

3 files changed

+209
-1
lines changed

package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
"keywords": [
1313
"grindjs",
1414
"grind-framework",
15+
"rabbitmq",
16+
"beanstalk",
17+
"ampq",
1518
"kue",
16-
"queue"
19+
"queue",
20+
"faktory"
1721
],
1822
"peerDependencies": {
1923
"grind-cli": "^0.7.0",
@@ -28,6 +32,7 @@
2832
"eslint": "^3.3.1",
2933
"eslint-config-grind": "^2.0.0",
3034
"eslint-plugin-import-auto-name": "^1.0.3",
35+
"faktory-worker": "^0.6.3",
3136
"fivebeans": "^1.5.0",
3237
"grind-cli": "^0.7.0",
3338
"grind-framework": "^0.7.1"

src/Drivers/FaktoryDriver.js

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import './BaseDriver'
2+
import { MissingPackageError } from 'grind-framework'
3+
4+
let Client = null
5+
let Manager = null
6+
7+
/**
8+
* Loads the faktory-client + faktory-worker packages
9+
* or throws an error if they haven’t been added
10+
*/
11+
function loadPackage() {
12+
if(!Client.isNil) {
13+
return
14+
}
15+
16+
try {
17+
Manager = require('faktory-worker/lib/manager')
18+
} catch(err) {
19+
throw new MissingPackageError('faktory-worker')
20+
}
21+
22+
try {
23+
Client = require('faktory-client')
24+
} catch(err) {
25+
throw new MissingPackageError('faktory-worker')
26+
}
27+
}
28+
29+
/**
30+
* Faktory backed Queue Driver
31+
*/
32+
export class FaktoryDriver extends BaseDriver {
33+
34+
connection = null
35+
channel = null
36+
uri = null
37+
38+
constructor(app, config) {
39+
super(app, config)
40+
41+
loadPackage()
42+
43+
this.config = config
44+
}
45+
46+
connect() {
47+
this.client = new Client(this.config)
48+
return this.client.connect()
49+
}
50+
51+
dispatch(job) {
52+
const payload = this.buildPayload(job)
53+
const at = payload.delay.isNil ? null : (new Date(Date.now() + payload.delay))
54+
payload.try = 1
55+
payload.queued_at = (at || new Date).getTime()
56+
57+
return this._push(payload, at)
58+
}
59+
60+
async listen(queues, concurrency, jobHandler, errorHandler) {
61+
this.manager = new Manager({
62+
...this.config,
63+
queues,
64+
concurrency,
65+
registry: {
66+
'grind-job': this._receiveMessage.bind(this, jobHandler, errorHandler)
67+
}
68+
})
69+
70+
return this.manager.run()
71+
}
72+
73+
async _receiveMessage(jobHandler, errorHandler, payload) {
74+
try {
75+
await jobHandler(payload)
76+
} catch(err) {
77+
try {
78+
await this._retryMessageOrRethrow(payload, err)
79+
} catch(err2) {
80+
await errorHandler(payload, err)
81+
}
82+
}
83+
}
84+
85+
async _retryMessageOrRethrow(payload, err) {
86+
const tries = Number(payload.tries) || 1
87+
88+
if(tries <= 1) {
89+
throw err
90+
}
91+
92+
const tryCount = Number(payload.try) || 1
93+
94+
if(tryCount >= tries) {
95+
throw err
96+
}
97+
98+
const timeout = Number(payload.timeout) || 0
99+
100+
if(timeout > 0) {
101+
const at = Number(payload.at) || 0
102+
103+
if(at > 0 && (Date.now() + timeout) > at) {
104+
throw err
105+
}
106+
}
107+
108+
const delay = payload.retry_delay || payload.delay
109+
const at = delay.isNil ? null : (new Date(Date.now() + delay))
110+
payload.try = tryCount + 1
111+
112+
return this._push(payload, at)
113+
}
114+
115+
_push(payload, at) {
116+
return this.client.push({
117+
jobtype: 'grind-job',
118+
queue: payload.queue,
119+
retry: 0,
120+
at: at === null ? null : at.toISOString(),
121+
args: [ payload ]
122+
})
123+
}
124+
125+
async destroy() {
126+
if(!this.manager.isNil) {
127+
await this.manager.stop()
128+
this.manager = null
129+
}
130+
131+
if(!this.client.isNil) {
132+
await this.client.close()
133+
this.client = null
134+
}
135+
}
136+
137+
}

test/FaktoryDriver.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { serial as test } from 'ava'
2+
3+
import './helpers/TestJob'
4+
import './helpers/Listener'
5+
import './helpers/Service'
6+
7+
import '../src/Drivers/FaktoryDriver'
8+
9+
const service = new Service(test, 'faktory', {
10+
image: 'contribsys/faktory',
11+
port: 7419
12+
})
13+
14+
test.beforeEach(t => {
15+
t.context.driver = new FaktoryDriver(null, {
16+
host: 'localhost',
17+
port: service.port
18+
})
19+
20+
return t.context.driver.connect()
21+
})
22+
23+
test.afterEach.always(t => t.context.driver.destroy())
24+
25+
test('dispatch', async t => {
26+
const payload = { time: Date.now() }
27+
const job = new TestJob({ ...payload })
28+
29+
await t.context.driver.dispatch(job)
30+
31+
return Listener(t.context.driver, job => t.deepEqual(job.data.data, payload))
32+
})
33+
34+
test('delayed dispatch', async t => {
35+
const payload = { time: Date.now() }
36+
const job = new TestJob({ ...payload })
37+
38+
const dispatchedAt = Date.now()
39+
await t.context.driver.dispatch(job.$delay(5000))
40+
41+
return Listener(t.context.driver, job => {
42+
// NOTE: Faktory’s spec says it will dispatch "within a few seconds"
43+
// of the delay time, so for test purposes, we can only ensure
44+
// that there was a delay at all, rather than the requested delay
45+
// as it does not appear to fully honor the request.
46+
t.is(Date.now() - dispatchedAt >= 1000, true)
47+
t.deepEqual(job.data.data, payload)
48+
})
49+
})
50+
51+
test('retry dispatch', async t => {
52+
const payload = { time: Date.now() }
53+
const job = new TestJob({ ...payload })
54+
let tries = 0
55+
56+
await t.context.driver.dispatch(job.$tries(2))
57+
58+
return Listener(t.context.driver, job => {
59+
t.is(job.tries, 2)
60+
t.deepEqual(job.data.data, payload)
61+
62+
if(++tries === 1 || tries > 2) {
63+
throw new Error
64+
}
65+
})
66+
})

0 commit comments

Comments
 (0)