This repository was archived by the owner on Oct 30, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstart-child-worker.js
107 lines (96 loc) · 3.29 KB
/
start-child-worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'use strict'
let rJob = require('./index')
const config = require('config')
let registerWorker = config.get('registerWorker')
const pino = require('pino')
const PINO = config.get('pino')
if(process.env.getJobModuleApiURL!== undefined && process.env.getJobModuleApiURL!== '') {
registerWorker.getJobModuleApiURL = process.env.getJobModuleApiURL
}
process.on('message', (m) => {
pino(PINO).info('CHILD got message:', m)
})
const vm = require('vm')
var rp = require('request-promise')
// this is job process global variable
global.JobExecute
function getJobTypeWorkerProcess (jobType) {
return new Promise((resolve, reject) => {
rp(registerWorker.getJobModuleApiURL + jobType)
.then(function (jobProcessCode) {
resolve(jobProcessCode)
})
.catch(function (err) {
reject(err)
})
})
}
let runWorker = function (options) {
pino(PINO).info("Worker Start", options)
rJob.getJobQueue(options).then(async result => {
try {
pino(PINO).info('job queue object created', options.queue.name)
let qObj = result.q
qObj.process(async (job, next, onCancel) => {
try {
JobExecute(job)
.then(result => { pino(PINO).info('worker done', options.queue.name); next(null, result) })
.catch(err => {
qObj.getJob(job.id).then((savedJobs) => {
const processDate = new Date((new Date()).getTime() + (2 * 60 * 1000))
savedJobs[0].status = 'active'
return qObj.reanimateJob(savedJobs[0], processDate)
}).catch(err => pino(PINO).error(err))
pino(PINO).error('worker error', options.queue.name)
})
pino(PINO).info('worker name', options.queue.name)
} catch (err) {
pino(PINO).error('handle by try-catch', options.queue.name)
return next(err)
}
})
// process.on('unhandledRejection', error => {
// // Won't execute
// pino(PINO).error('unhandledRejection', error);
// });
qObj.on('idle', (queueId) => {
pino(PINO).info('Queue is idle: ' + queueId)
pino(PINO).info("worker process id :", process.pid)
process.send({ 'subprocess': 'exit', 'pid': process.pid })
process.exit()
})
} catch (e) {
pino(PINO).error(e)
}
}).catch(e => pino(PINO).error(e))
}
module.exports.runWorker = runWorker
let executeWorker = async function (jobType, options) {
try {
let jobProcessCode = await getJobTypeWorkerProcess(jobType) // `function (job,next){pino(PINO).info("dynamic job process load")};`
const script = new vm.Script(`
(function(require) {
JobExecute = function(job) {
return new Promise(
async (resolve, reject) =>
{
try {
` + jobProcessCode + `
} catch(err) {
reject({ error: err,jobdata:job})
}
}
)
}
})`,
{ filename: 'jobProcessTrace.vm' })
script.runInThisContext()(require)
runWorker(options)
pino(PINO).info('Child Process as Worker Executed')
} catch (e) {
pino(PINO).error(e)
pino(PINO).error('unable to load child worker.')
}
}
let jobOptions = JSON.parse(process.argv[3])
executeWorker(process.argv[2], jobOptions)