Skip to content

Commit f44a6f8

Browse files
committed
Added a feature that kills worker flow instances when the main process is terminated.
1 parent cbdb2c5 commit f44a6f8

File tree

4 files changed

+31
-17
lines changed

4 files changed

+31
-17
lines changed

changelog.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- added `PYPELINE(name, [options])` method starts a python process
99
- added support for custom View engine helpers `DEF.helpers.myhelper`
1010
- fixed killing workers and pypelines
11+
- added a feature that kills worker flow instances when the main process is terminated
1112

1213
========================
1314
0.0.12

flow-flowstream.js

100644100755
Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ Instance.prototype.httprouting = function() {
198198
if (index !== -1) {
199199
try {
200200
opt.cookies[line.substring(0, index)] = decodeURIComponent(line.substring(index + 1));
201-
} catch (e) {}
201+
} catch {}
202202
}
203203
}
204204
}
@@ -412,6 +412,7 @@ Instance.prototype.kill = Instance.prototype.destroy = function() {
412412
setTimeout(() => exports.refresh(self.id, 'destroy'), 500);
413413
self.flow.$destroyed = true;
414414
self.flow.$terminated = true;
415+
self.flow.$socket && self.flow.$socket.close();
415416

416417
if (self.flow.isworkerthread) {
417418

@@ -1972,7 +1973,8 @@ function MAKEFLOWSTREAM(meta) {
19721973
var saveforce = function() {
19731974
saveid && clearTimeout(saveid);
19741975
saveid = null;
1975-
flow.proxy.save(flow.export2());
1976+
if (!flow.$destroyed)
1977+
flow.proxy.save(flow.export2());
19761978
};
19771979

19781980
var save = function() {
@@ -2008,9 +2010,9 @@ function MAKEFLOWSTREAM(meta) {
20082010

20092011
var refresh_components_force = function() {
20102012
timeoutrefresh = null;
2011-
if (flow.proxy.online) {
2013+
if (!flow.$destroyed && flow.proxy.online) {
20122014
flow.proxy.send({ TYPE: 'flow/components', data: flow.components(true) });
2013-
var instances = flow.export();
2015+
let instances = flow.export();
20142016
flow.proxy.send({ TYPE: 'flow/design', data: instances });
20152017
}
20162018
};
@@ -2753,10 +2755,12 @@ function MAKEFLOWSTREAM(meta) {
27532755
flow.proxy.send({ TYPE: 'flow/design', data: flow.export() }, 1, clientid);
27542756
flow.proxy.send({ TYPE: 'flow/errors', data: flow.errors }, 1, clientid);
27552757
setTimeout(function() {
2756-
flow.instances().wait(function(com, next) {
2757-
com.status();
2758-
setImmediate(next);
2759-
}, 3);
2758+
if (!flow.$destroyed) {
2759+
flow.instances().wait(function(com, next) {
2760+
com.status();
2761+
setImmediate(next);
2762+
}, 3);
2763+
}
27602764
}, 1500);
27612765
}
27622766
}

flowstream.js

100644100755
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ FP.destroy = function() {
751751

752752
self.inc(1);
753753
self.unload(function() {
754+
self.$destroyed = true;
754755
self.inc(-1);
755756
self.emit('destroy');
756757
self.meta = null;

index.js

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ global.DEF = {};
274274
try {
275275
if (!pathexists(path))
276276
F.Fs.mkdirSync(path, { recursive: true });
277-
} catch (e) {}
277+
} catch {}
278278
};
279279

280280
})(global.F);
@@ -287,7 +287,7 @@ function pathexists(filename, isfile) {
287287
try {
288288
var val = F.Fs.statSync(filename);
289289
return val ? (isfile ? val.isFile() : true) : false;
290-
} catch (e) {
290+
} catch {
291291
return false;
292292
}
293293
}
@@ -905,7 +905,7 @@ F.require = function(name) {
905905

906906
try {
907907
mod = require(name);
908-
} catch (e) {
908+
} catch {
909909
mod = require(F.Path.join(F.config.$nodemodules, name));
910910
}
911911

@@ -1155,8 +1155,11 @@ F.loadservices = function() {
11551155
F.internal.ticks++;
11561156
global.NOW = new Date();
11571157

1158-
for (let key in F.flowstreams)
1159-
F.flowstreams[key].service(F.internal.ticks);
1158+
for (let key in F.flowstreams) {
1159+
let flow = F.flowstreams[key];
1160+
if (!flow.$destroyed)
1161+
flow.service(F.internal.ticks);
1162+
}
11601163

11611164
if (F.internal.ticks == 6 || F.internal.ticks == 12)
11621165
F.TWebSocket.ping();
@@ -1258,7 +1261,7 @@ F.httpload = function(opt) {
12581261

12591262
try {
12601263
F.Fs.unlinkSync(unixsocket);
1261-
} catch (e) {}
1264+
} catch {}
12621265

12631266
if (F.isWindows && unixsocket.indexOf(SOCKETWINDOWS) === -1)
12641267
unixsocket = F.Path.join(SOCKETWINDOWS, unixsocket);
@@ -1897,7 +1900,7 @@ F.memorize = function(name, delay, skip) {
18971900

18981901
try {
18991902
data = F.Fs.readFileSync(filename, 'utf8').parseJSON(true);
1900-
} catch (e) {}
1903+
} catch {}
19011904

19021905
var replacer;
19031906
var timeout;
@@ -2496,13 +2499,18 @@ F.exit = function(signal = 15) {
24962499
} catch {}
24972500
}
24982501

2499-
25002502
for (let m of F.pypelines) {
25012503
try {
25022504
m.kill(signal);
25032505
} catch {}
25042506
}
25052507

2508+
for (let key in Flow.instances) {
2509+
try {
2510+
Flow.instances[key].destroy();
2511+
} catch {}
2512+
}
2513+
25062514
let key = 'exit';
25072515

25082516
F.$events[key] && F.emit(key, signal);
@@ -2583,7 +2591,7 @@ F.decrypt = function(value, key, tojson = true) {
25832591
CONCAT[0] = decipher.update(Buffer.from(value || '', 'hex'));
25842592
CONCAT[1] = decipher.final();
25852593
response = Buffer.concat(CONCAT).toString('utf8');
2586-
} catch (e) {
2594+
} catch {
25872595
response = null;
25882596
}
25892597
} else

0 commit comments

Comments
 (0)