diff --git a/.eslintignore b/.eslintignore new file mode 100644 index 0000000..e69de29 diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 0000000..f5bbc06 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,13 @@ +module.exports = { + root: true, + extends: [ + "eslint-config-2o3t" + ], + parser: 'babel-eslint', + env: { + browser: true, + node: true, + commonjs: true, + es6: true + }, +} diff --git a/index.js b/index.js new file mode 100644 index 0000000..408e10a --- /dev/null +++ b/index.js @@ -0,0 +1,13 @@ +'use strict'; + +const Client = require('./lib/Client'); +const Server = require('./lib/Server'); +const ServerChannel = Server.Channel; +const ClientChannel = require('./lib/channel/ChannelClient'); + +module.exports = { + Client, + Server, + ServerChannel, + ClientChannel, +}; diff --git a/lib/Client.js b/lib/Client.js new file mode 100644 index 0000000..5d3ed03 --- /dev/null +++ b/lib/Client.js @@ -0,0 +1,190 @@ +'use strict'; + +const { fork } = require('child_process'); +const createQueuedSender = require('./utils/createQueuedSender'); +const Delayer = require('./utils/Delayer'); +const { UnkownTypeError } = require('./utils/Error'); +const ChannelClient = require('./channel/ChannelClient'); + +// export interface IIPCOptions { + +// /** +// * A descriptive name for the server this connection is to. Used in logging. +// */ +// serverName: string; + +// /** +// * Time in millies before killing the ipc process. The next request after killing will start it again. +// */ +// timeout?: number; + +// /** +// * Arguments to the module to execute. +// */ +// args?: string[]; + +// /** +// * Environment key-value pairs to be passed to the process that gets spawned for the ipc. +// */ +// env?: any; + +// /** +// * Allows to assign a debug port for debugging the application executed. +// */ +// debug?: number; + +// /** +// * Allows to assign a debug port for debugging the application and breaking it on the first line. +// */ +// debugBrk?: number; + +// /** +// * See https://github.com/Microsoft/vscode/issues/27665 +// * Allows to pass in fresh execArgv to the forked process such that it doesn't inherit them from `process.execArgv`. +// * e.g. Launching the extension host process with `--inspect-brk=xxx` and then forking a process from the extension host +// * results in the forked process inheriting `--inspect-brk=xxx`. +// */ +// freshExecArgv?: boolean; + +// /** +// * Enables our createQueuedSender helper for this Client. Uses a queue when the internal Node.js queue is +// * full of messages - see notes on that method. +// */ +// useQueue?: boolean; +// } + +class Client { + /** + *Creates an instance of Client. + * @param {String} modulePath path + * @param {IIPCOptions} [options={}] option + * @memberof Client + */ + constructor(modulePath, options = {}) { + this.modulePath = modulePath; + const timeout = options && options.timeout ? options.timeout : 60 * 1000; + this.disposeDelayer = new Delayer(timeout); + this.channels = new Map(); + this.child = null; + this._client = null; + } + + /** + * 根据通道名称获取通道 + * @param {String} channelName name + */ + getChannel(channelName) { + let channel = this.channels.get(channelName); + + if (!channel) { + channel = this._getClientChannel(channelName); + this.channels.set(channelName, channel); + } + + return channel; + } + + get client() { + if (!this._client) { + const args = this.options && this.options.args ? this.options.args : []; + const forkOpts = Object.create(null); + + forkOpts.env = Object.assign(JSON.parse(JSON.stringify(process.env)), { PARENT_PID: String(process.pid) }); + + if (this.options && this.options.env) { + forkOpts.env = Object.assign(forkOpts.env, this.options.env); + } + + if (this.options && this.options.freshExecArgv) { + forkOpts.execArgv = []; + } + + if (this.options && typeof this.options.debug === 'number') { + forkOpts.execArgv = [ '--nolazy', '--inspect=' + this.options.debug ]; + } + + if (this.options && typeof this.options.debugBrk === 'number') { + forkOpts.execArgv = [ '--nolazy', '--inspect-brk=' + this.options.debugBrk ]; + } + + this.child = fork(this.modulePath, args, forkOpts); + + const sender = this.options && this.options.useQueue ? createQueuedSender(this.child) : this.child; + const send = message => this.child && this.child.connected && sender.send(message); + const on = this.child && this.child.connected && this.child.on.bind(this.child); + const protocol = { send, on }; + this._client = new ChannelClient(protocol, { + onZero: () => { + if (this.disposeDelayer) { + this.disposeDelayer.trigger(() => this._disposeClient()); + } + }, + }); + + const onExit = () => this._disposeClient(); + process.once('exit', onExit); // 暂不支持 background 运行 + + this.child.on('error', err => console.warn('IPC "' + this.options.serverName + '" errored with ' + err)); + + this.child.on('exit', (code, signal) => { + process.removeListener('exit', onExit); + + if (code !== 0 && signal !== 'SIGTERM') { + console.warn('IPC "' + this.options.serverName + '" crashed with exit code ' + code + ' and signal ' + signal); + } + + if (this.disposeDelayer) { + this.disposeDelayer.cancel(); + } + this._disposeClient(); + }); + } + return this._client; + } + + _getClientChannel(name) { + const that = this; + + return { + call(command, arg) { + if (!that.disposeDelayer) { + return Promise.reject(new UnkownTypeError()); + } + that.disposeDelayer.cancel(); + return that.client.getChannel(name).call(command, arg); + }, + listen(event, arg) { + if (!that.disposeDelayer) { + return function() { + return { dispose() {} }; + }; + } + + that.disposeDelayer.cancel(); + return that.client.getChannel(name).listen(event, arg); + }, + }; + } + + _disposeClient() { + if (this._client) { + if (this.child) { + this.child.kill(); + this.child = null; + } + this._client.dispose(); + this._client = null; + this.channels.clear(); + } + } + + dispose() { + if (this.disposeDelayer) { + this.disposeDelayer.cancel(); + this.disposeDelayer = null; // StrictNullOverride: nulling out ok in dispose + } + this._disposeClient(); + } +} + +module.exports = Client; diff --git a/lib/Server.js b/lib/Server.js new file mode 100644 index 0000000..0a08a7c --- /dev/null +++ b/lib/Server.js @@ -0,0 +1,21 @@ +'use strict'; + +const ChannelServer = require('./channel/ChannelServer'); +const ServerChannel = require('./channel/ServerChannel'); + +class Server extends ChannelServer { + constructor(options) { + super(process, options); + process.once('disconnect', () => this.dispose()); + } + + get info() { + const pid = process.pid; + const ppid = process.ppid; + const memoryUsage = process.memoryUsage(); + return { pid, ppid, memoryUsage }; + } +} + +Server.Channel = ServerChannel; +module.exports = Server; diff --git a/lib/channel/BaseContext.js b/lib/channel/BaseContext.js new file mode 100644 index 0000000..e407890 --- /dev/null +++ b/lib/channel/BaseContext.js @@ -0,0 +1,36 @@ +'use strict'; + +const Protocol = require('./Protocol'); + +class BaseContext { + + constructor(protocol) { + this.protocol = new Protocol(protocol); + } + + _send(header, body = undefined) { + const param = { header }; + if (body !== undefined) { + param.body = body; + } + const writers = new Buffer(JSON.stringify(param)); + this._sendBuffer(writers.toString('base64')); + } + + _sendBuffer(message) { + try { + this.protocol.send(message); + } catch (err) { + // noop + } + } + + dispose() { + if (this.protocol) { + this.protocol.dispose(); + this.protocol = null; + } + } +} + +module.exports = BaseContext; diff --git a/lib/channel/ChannelClient.js b/lib/channel/ChannelClient.js new file mode 100644 index 0000000..aa45eea --- /dev/null +++ b/lib/channel/ChannelClient.js @@ -0,0 +1,233 @@ +'use strict'; + +const BaseContext = require('./BaseContext'); +const { ResponseType, RequestType, State } = require('../utils/Type'); +const { CanceledError, UnkownTypeError } = require('../utils/Error'); +const createCancelablePromise = require('../utils/createCancelablePromise'); +const Emitter = require('../utils/Emitter'); + +// export interface IMessagePassingProtocol { +// send(message: string): void; +// on: Event; +// } + +const defaultOptions = { + onZero: () => {}, +}; + +class ChannelClient extends BaseContext { + + constructor(protocol, options = {}) { + super(protocol); + this.options = Object.assign({}, defaultOptions, options); + + this.lastRequestId = 0; + this.state = State.Uninitialized; + this.activeRequests = new Set(); + this.handlers = new Map(); + + this._protocolListener = this.protocol.onMessage(msg => this._onBuffer(msg)); + + this._lifecycleEmitter = new Emitter({ name: 'ON_DID_INITIALIZE' }); + this._onDidInitialize = this._createDidInitializeEvent(); + } + + getChannel(channelName) { + const that = this; + + return { + call(command, arg) { + return that._requestPromise(channelName, command, arg); + }, + listen(event, arg) { + return that._requestEvent(channelName, event, arg); + }, + }; + } + + _createDidInitializeEvent() { + const that = this; + return { + release(args) { + that._lifecycleEmitter.emit(args); + }, + wait() { // wait inited + if (that.state === State.Idle) { + return Promise.resolve(); + } + return new Promise(resolve => { + that._lifecycleEmitter.on(resolve); + }); + }, + }; + } + + _requestPromise(channelName, name, arg) { + const id = this.lastRequestId++; + const type = RequestType.Promise; + const request = { id, type, channelName, name, arg }; + + let disposable = null; + + const result = new Promise((c, e) => { + let uninitializedPromise = createCancelablePromise(this._onDidInitialize.wait()); + uninitializedPromise.then(() => { + uninitializedPromise = null; + + const handler = response => { + const { data, type } = response; + switch (type) { + case ResponseType.PromiseSuccess: + this.handlers.delete(id); + c(data); + break; + + case ResponseType.PromiseError: + this.handlers.delete(id); + e(new Error(data.message)); + break; + + case ResponseType.PromiseErrorObj: + this.handlers.delete(id); + e(data); + break; + default: + this.handlers.delete(id); + e(new UnkownTypeError()); + } + }; + + this.handlers.set(id, handler); + this._sendRequest(request); + }); + + const cancel = () => { + if (uninitializedPromise) { + uninitializedPromise.cancel(); + uninitializedPromise = null; + } else { + this._sendRequest({ id, type: RequestType.PromiseCancel }); + } + + e(new CanceledError()); + }; + + disposable = { + dispose: () => { + cancel(); + }, + }; + this.activeRequests.add(disposable); + }); + + return createCancelablePromise(result).finally(() => { + this.activeRequests.delete(disposable); + + if (this.activeRequests.size === 0) { + this.options && this.options.onZero(); + } + }); + } + + // 事件监听 + _requestEvent(channelName, name, arg) { + const id = this.lastRequestId++; + const type = RequestType.EventListen; + const request = { id, type, channelName, name, arg }; + + let uninitializedPromise = null; + + const emitter = new Emitter({ + onFirstListenerAdd: () => { + // first + uninitializedPromise = createCancelablePromise(this._onDidInitialize.wait()); + uninitializedPromise.then(() => { + uninitializedPromise = null; + this.activeRequests.add(emitter); + this._sendRequest(request); + }); + }, + onLastListenerRemove: () => { + if (uninitializedPromise) { + uninitializedPromise.cancel(); + uninitializedPromise = null; + } else { + this.activeRequests.delete(emitter); + this._sendRequest({ id, type: RequestType.EventDispose }); + } + + if (this.activeRequests.size === 0) { + this.options && this.options.onZero(); + } + }, + }); + + const handler = res => emitter.emit(res.data); + this.handlers.set(id, handler); + + return emitter.event; + } + + _sendRequest(request) { + const header = [ request.type, request.id ]; + switch (request.type) { + case RequestType.Promise: + case RequestType.EventListen: + return this._send([ ...header, request.channelName, request.name ], request.arg); + + case RequestType.PromiseCancel: + case RequestType.EventDispose: + return this._send(header); + default: + } + } + + // 接受 + + _onBuffer(message) { + const buffer = new Buffer(message, 'base64'); + const msg = JSON.parse(buffer.toString()); + const header = msg.header; + const body = msg.body; + const type = header[0]; + const id = header[1]; + + switch (type) { + case ResponseType.Initialize: + return this._onResponse({ type }); + + case ResponseType.PromiseSuccess: + case ResponseType.PromiseError: + case ResponseType.EventFire: + case ResponseType.PromiseErrorObj: + return this._onResponse({ type, id, data: body }); + default: + } + } + + _onResponse(response) { + if (response.type === ResponseType.Initialize) { + this.state = State.Idle; + this._onDidInitialize.release(response); + return; + } + + const handler = this.handlers.get(response.id); + + if (handler) { + handler(response); + } + } + + dispose() { + super.dispose(); + if (this._protocolListener) { + this._protocolListener = null; + } + this.activeRequests.forEach(p => p.dispose()); + this.activeRequests.clear(); + this.handlers.clear(); + } +} + +module.exports = ChannelClient; diff --git a/lib/channel/ChannelServer.js b/lib/channel/ChannelServer.js new file mode 100644 index 0000000..b4fe84c --- /dev/null +++ b/lib/channel/ChannelServer.js @@ -0,0 +1,213 @@ +'use strict'; + +const BaseContext = require('./BaseContext'); +const { ResponseType, RequestType, State } = require('../utils/Type'); +const createCancelablePromise = require('../utils/createCancelablePromise'); +const Emitter = require('../utils/Emitter'); + +const defaultOptions = { + timeoutDelay: 1000, +}; + +class ChannelServer extends BaseContext { + + constructor(protocol, options = {}) { + super(protocol); + this.options = Object.assign({}, defaultOptions, options); + + this.channels = new Map(); + this.activeRequests = new Map(); + + // Requests might come in for channels which are not yet registered. + // They will timeout after `timeoutDelay`. + this.pendingRequests = new Map(); + this.timeoutDelay = this.options.timeoutDelay; + + this._protocolListener = this.protocol.onMessage(msg => this._onRawMessage(msg)); + this._sendResponse({ type: ResponseType.Initialize }); + } + + registerChannel(channelName, channel) { + this.channels.set(channelName, channel); + this._flushPendingRequests(channelName); + } + + _sendResponse(response) { + switch (response.type) { + case ResponseType.Initialize: + return this._send([ response.type ]); + + case ResponseType.PromiseSuccess: + case ResponseType.PromiseError: + case ResponseType.EventFire: + case ResponseType.PromiseErrorObj: + return this._send([ response.type, response.id ], response.data); + default: + } + } + + _onRawMessage(message) { + const buffer = new Buffer(message, 'base64'); + const msg = JSON.parse(buffer.toString()); + const header = msg.header; + const body = msg.body; + const type = header[0]; + const id = header[1]; + const channelName = header[2]; + const name = header[3]; + + switch (type) { + case RequestType.Promise: + return this._onPromise({ type, id, channelName, name, arg: body }); + case RequestType.EventListen: + return this._onEventListen({ type, id, channelName, name, arg: body }); + case RequestType.PromiseCancel: + return this._disposeActiveRequest({ type, id }); + case RequestType.EventDispose: + return this._disposeActiveRequest({ type, id }); + default: + } + } + + _onPromise(request) { + const channel = this.channels.get(request.channelName); + + if (!channel) { + this._collectPendingRequest(request); + return; + } + + let promise; + + try { + promise = channel.call(request.name, request.arg); + } catch (err) { + promise = Promise.reject(err); + } + + if (!promise || typeof promise.then !== 'function') { + promise = Promise.reject('must be return Promise!'); + } + + const id = request.id; + + let cancelablePromise = createCancelablePromise(promise); + cancelablePromise.then(data => { + cancelablePromise = null; + this._sendResponse({ id, data, type: ResponseType.PromiseSuccess }); + this.activeRequests.delete(id); + }).catch(err => { + cancelablePromise = null; + if (err instanceof Error) { + this._sendResponse({ + id, type: ResponseType.PromiseError, + data: { + message: err.message, + name: err.name, + stack: err.stack ? (err.stack.split ? err.stack.split('\n') : err.stack) : undefined, + }, + }); + } else { + this._sendResponse({ id, data: err, type: ResponseType.PromiseErrorObj }); + } + this.activeRequests.delete(id); + }); + + const cancel = () => { + if (cancelablePromise) { + cancelablePromise.cancel(); + cancelablePromise = null; + } + }; + + const disposable = { + dispose: () => { + cancel(); + }, + }; + this.activeRequests.set(id, disposable); + } + + _onEventListen(request) { + const channel = this.channels.get(request.channelName); + + if (!channel) { + this._collectPendingRequest(request); + return; + } + + const id = request.id; + // const emitter = new Emitter({ + // name: request.name, + // }); + const event = channel.listen(request.name, request.arg); + const disposable = event(data => { + this._sendResponse({ id, data, type: ResponseType.EventFire }); + }); + + this.activeRequests.set(id, disposable); + } + + _disposeActiveRequest(request) { + const disposable = this.activeRequests.get(request.id); + + if (disposable) { + disposable.dispose(); + this.activeRequests.delete(request.id); + } + } + + _collectPendingRequest(request) { + let pendingRequests = this.pendingRequests.get(request.channelName); + + if (!pendingRequests) { + pendingRequests = []; + this.pendingRequests.set(request.channelName, pendingRequests); + } + + const timer = setTimeout(() => { + console.error(`Unknown channel: ${request.channelName}`); + + if (request.type === RequestType.Promise) { + this._sendResponse({ + id: request.id, + data: { name: 'Unknown channel', message: `Channel name '${request.channelName}' timed out after ${this.timeoutDelay}ms`, stack: undefined }, + type: ResponseType.PromiseError + }); + } + }, this.timeoutDelay); + + pendingRequests.push({ request, timeoutTimer: timer }); + } + + _flushPendingRequests(channelName) { + const requests = this.pendingRequests.get(channelName); + + if (requests) { + for (const item of requests) { + clearTimeout(item.timeoutTimer); + + switch (item.request.type) { + case RequestType.Promise: this._onPromise(item.request); break; + case RequestType.EventListen: this._onEventListen(item.request); break; + } + } + + this.pendingRequests.delete(channelName); + } + } + + dispose() { + super.dispose(); + if (this._protocolListener) { + this._protocolListener = null; + } + this.activeRequests.forEach(d => d.dispose()); + this.activeRequests.clear(); + this.channels.forEach(d => d.dispose()); + this.channels.clear(); + this.pendingRequests.clear(); + } +} + +module.exports = ChannelServer; diff --git a/lib/channel/Protocol.js b/lib/channel/Protocol.js new file mode 100644 index 0000000..ecf19cc --- /dev/null +++ b/lib/channel/Protocol.js @@ -0,0 +1,22 @@ +'use strict'; + +class Protocol { + + constructor(context) { + this._context = context; + } + + onMessage(listener) { + return this._context.on('message', listener); + } + + send(arg) { + return this._context.send(arg); + } + + dispose() { + this._context = null; + } +} + +module.exports = Protocol; diff --git a/lib/channel/ServerChannel.js b/lib/channel/ServerChannel.js new file mode 100644 index 0000000..68f943b --- /dev/null +++ b/lib/channel/ServerChannel.js @@ -0,0 +1,42 @@ +'use strict'; + +const Emitter = require('../utils/Emitter'); + +class ServerChannel { + + constructor(service) { + this.service = service; + this.emitters = new Set(); + } + + call(command, arg) { + if (this.service && this.service[command] && typeof this.service[command] === 'function') { + return this.service[command](arg); + } + return Promise.reject(new Error(`[ ${command} ] not implemented`)); + + } + + listen(event, arg) { + if (this.service && this.service[event] && typeof this.service[event] === 'function') { + const emitter = new Emitter({ name: event }); + this.emitters.add(emitter); + setTimeout(() => { // 防止同步调用 + this.service[event](emitter.emit.bind(emitter)); + }); + return emitter.event; + } + throw new Error(`[ ${command} ] not implemented`); + + } + + dispose() { + if (this.service) { + this.service = null; + } + this.emitters.forEach(d => d.dispose()); + this.emitters.clear(); + } +} + +module.exports = ServerChannel; diff --git a/lib/utils/Delayer.js b/lib/utils/Delayer.js new file mode 100644 index 0000000..c664e60 --- /dev/null +++ b/lib/utils/Delayer.js @@ -0,0 +1,75 @@ +'use strict'; + +const { CanceledError } = require('./Error'); + +class Delayer { + + constructor(defaultDelay) { + this.defaultDelay = defaultDelay; + this.timeout = null; + this.completionPromise = null; + this.doResolve = null; + this.task = null; + } + + /** + * @param {Promise} task Promise + * @param {number} [delay=this.defaultDelay] 延迟时间 + * @memberof Delayer + * @return {Promise} Promise + */ + trigger(task, delay = this.defaultDelay) { + this.task = task; + this._cancelTimeout(); + + if (!this.completionPromise) { + this.completionPromise = new Promise((c, e) => { + this.doResolve = c; + this.doReject = e; + }).then(() => { + this.completionPromise = null; + this.doResolve = null; + const task = this.task; + this.task = null; + + if (task) { + return task(); + } + return Promise.reject(new Error('task is null')); + }); + } + + this.timeout = setTimeout(() => { + this.timeout = null; + this.doResolve && this.doResolve(null); + }, delay); + + return this.completionPromise; + } + + isTriggered() { + return this.timeout !== null; + } + + cancel() { + this._cancelTimeout(); + + if (this.completionPromise) { + this.doReject(new CanceledError()); + this.completionPromise = null; + } + } + + _cancelTimeout() { + if (this.timeout !== null) { + clearTimeout(this.timeout); + this.timeout = null; + } + } + + dispose() { + this._cancelTimeout(); + } +} + +module.exports = Delayer; diff --git a/lib/utils/Emitter.js b/lib/utils/Emitter.js new file mode 100644 index 0000000..fccf8f0 --- /dev/null +++ b/lib/utils/Emitter.js @@ -0,0 +1,99 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; + +const DEFAULT_OPTIONS = { + name: 'EVENT_NAME', + onFirstListenerAdd: () => {}, + onFirstListenerDidAdd: () => {}, + onListenerDidAdd: () => {}, + onLastListenerRemove: () => {}, + onDisposed: () => {}, +}; + +function isFunction(func) { + return typeof func === 'function'; +} + +class Emitter { + + constructor(options = {}) { + this.options = Object.assign({}, DEFAULT_OPTIONS, options); + this.activeListeners = []; + this.isDisposed = false; + this.name = this.options.name; + } + + get emitter() { + if (!this._emitter) { + this._emitter = new EventEmitter(); + } + return this._emitter; + } + + get event() { + if (!this._event) { + this._event = this.on.bind(this); + } + return this._event; + } + + emit(data) { + if (this.isDisposed) return; + return this.emitter.emit(this.name, data); + } + + on(listener) { + if (this.isDisposed) return; + const _activeListeners = this.activeListeners; + const _options = this.options; + const firstListener = _activeListeners.length < 1; + if (firstListener && _options && isFunction(_options.onFirstListenerAdd)) { + _options.onFirstListenerAdd(this); + } + _activeListeners.push(listener); + if (firstListener && _options && isFunction(_options.onFirstListenerDidAdd)) { + _options.onFirstListenerDidAdd(this); + } + if (_options && isFunction(_options.onListenerDidAdd)) { + _options.onListenerDidAdd(this, listener); + } + this.emitter.addListener(this.name, listener); + + const result = { + dispose: () => { + this.off(listener); + }, + }; + + return result; + } + + off(listener) { + if (this.isDisposed) return; + const _activeListeners = this.activeListeners; + const _options = this.options; + const index = _activeListeners.indexOf(listener); + if (index > -1) { + _activeListeners.splice(index, 1); + } + const lastListener = _activeListeners.length < 1; + if (lastListener && _options && isFunction(_options.onLastListenerRemove)) { + _options.onLastListenerRemove(this); + } + return this.emitter.removeListener(this.name, listener); + } + + dispose() { + if (this.isDisposed) return; + this.emitter.removeAllListeners(); + this.activeListeners = []; + this.isDisposed = true; + this._emitter = null; + if (this.options && isFunction(this.options.onDisposed)) { + this.options.onDisposed(this); + } + } +} + +module.exports = Emitter; diff --git a/lib/utils/Error.js b/lib/utils/Error.js new file mode 100644 index 0000000..a996c15 --- /dev/null +++ b/lib/utils/Error.js @@ -0,0 +1,19 @@ +'use strict'; + +class CanceledError extends Error { + constructor(message = 'Canceled') { + super(message); + this.isCanceled = true; + } +} + +class UnkownTypeError extends Error { + constructor(message = 'Unkown Type') { + super(message); + } +} + +module.exports = { + CanceledError, + UnkownTypeError, +}; diff --git a/lib/utils/Type.js b/lib/utils/Type.js new file mode 100644 index 0000000..6bf84d6 --- /dev/null +++ b/lib/utils/Type.js @@ -0,0 +1,25 @@ +'use strict'; + +const ResponseType = { + Initialize: 200, + PromiseSuccess: 201, + PromiseError: 202, + PromiseErrorObj: 203, + EventFire: 204, +}; + +const RequestType = { + Promise: 100, + PromiseCancel: 101, + EventListen: 102, + EventDispose: 103, +}; + +const State = { + Uninitialized: 0, + Idle: 1, +}; + +module.exports = { + ResponseType, RequestType, State, +}; diff --git a/lib/utils/createCancelablePromise.js b/lib/utils/createCancelablePromise.js new file mode 100644 index 0000000..b1b8e88 --- /dev/null +++ b/lib/utils/createCancelablePromise.js @@ -0,0 +1,30 @@ +'use strict'; + +const { CanceledError } = require('./Error'); + +const createCancelablePromise = promise => { + let _hasCanceled = false; + let _finallyListener = null; + const wrappedPromise = new Promise((resolve, reject) => { + promise.then(val => { + _hasCanceled ? reject(new CanceledError()) : resolve(val); + + typeof _finallyListener === 'function' && _finallyListener(); + }).catch(error => { + _hasCanceled ? reject(new CanceledError()) : reject(error); + + typeof _finallyListener === 'function' && _finallyListener(); + }); + }); + wrappedPromise.cancel = () => { + _hasCanceled = true; + return _hasCanceled; + }; + wrappedPromise.finally = listener => { + _finallyListener = listener; + return wrappedPromise; + }; + return wrappedPromise; +}; + +module.exports = createCancelablePromise; diff --git a/lib/utils/createQueuedSender.js b/lib/utils/createQueuedSender.js new file mode 100644 index 0000000..834a6ba --- /dev/null +++ b/lib/utils/createQueuedSender.js @@ -0,0 +1,41 @@ +'use strict'; + +let _isWindows = false; +if (typeof process === 'object') { + _isWindows = (process.platform === 'win32'); +} + +const createQueuedSender = childProcess => { + let msgQueue = []; + let useQueue = false; + + const send = function(msg) { + if (useQueue) { + msgQueue.push(msg); // add to the queue if the process cannot handle more messages + return; + } + + const result = childProcess.send(msg, error => { + if (error) { + console.error(error); // unlikely to happen, best we can do is log this error + } + + useQueue = false; // we are good again to send directly without queue + + // now send all the messages that we have in our queue and did not send yet + if (msgQueue.length > 0) { + const msgQueueCopy = msgQueue.slice(0); + msgQueue = []; + msgQueueCopy.forEach(entry => send(entry)); + } + }); + + if (!result || _isWindows /* workaround https://github.com/nodejs/node/issues/7657 */) { + useQueue = true; + } + }; + + return { send }; +}; + +module.exports = createQueuedSender; diff --git a/package.json b/package.json new file mode 100644 index 0000000..ed3aab0 --- /dev/null +++ b/package.json @@ -0,0 +1,18 @@ +{ + "name": "@2o3t/process-manager", + "version": "0.0.1", + "description": "", + "main": "index.js", + "directories": { + "lib": "lib" + }, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "zyao89 ", + "license": "ISC", + "devDependencies": { + "eslint": "^5.16.0", + "eslint-config-2o3t": "^1.1.15" + } +} diff --git a/test/index.js b/test/index.js new file mode 100644 index 0000000..b80207d --- /dev/null +++ b/test/index.js @@ -0,0 +1,22 @@ +'use strict'; + +const PM = require('../'); +const client = new PM.Client(__dirname + '/sub-processes.js'); +const channel = client.getChannel('abc'); +channel.call('ccd', 789).then(data => { + console.log('data', data); +}); +setInterval(() => { + channel.call('ccd', 'info').then(data => { + console.log('info: ', data); + }); +}, 2000); +channel.listen('ccc')(aa => { + console.log('listen..', aa); +}); + +const http = require('http'); +http.createServer(function(request, response) { + response.writeHead(200, { 'Content-Type': 'text/plain' }); + response.end('Hello World\n'); +}).listen(8888); diff --git a/test/sub-processes.js b/test/sub-processes.js new file mode 100644 index 0000000..9c8d2ba --- /dev/null +++ b/test/sub-processes.js @@ -0,0 +1,32 @@ +'use strict'; + +// console.log(process.debugPort); +// console.log(process.arch); +// // console.log(process.channel); + +// console.log(process.env); +// console.log(process.memoryUsage()); +// console.log(`This process is pid ${process.pid}`); +// console.log(`The parent process is pid ${process.ppid}`); + +// console.log(process.title); + +const PM = require('../'); +const server = new PM.Server(); +const channel = new PM.ServerChannel({ + ccd(arg) { + console.log('arg: ', arg); + if (arg === 'info') return Promise.resolve(server.info); + return Promise.resolve(arg); + }, + ccc(emit) { + emit('100w'); + emit('101w'); + // server.dispose(); + emit('102w'); + emit('103w'); + emit('104w'); + }, +}); +console.log('server ok'); +server.registerChannel('abc', channel);