diff --git a/demoRedisEventXevPubSub.js b/demoRedisEventXevPubSub.js new file mode 100644 index 0000000..8665399 --- /dev/null +++ b/demoRedisEventXevPubSub.js @@ -0,0 +1,34 @@ +const RedisEventEmitter = require('./src/modules/RedisEventEmitter'); +const XevPubSub = require('./src/modules/XevPubSub'); + +// in streaming-server: + +// 2) redisから流れてきた広域のイベントを、ローカルのpub/subにストリーミングTLの内容として投げる +const reciever = new RedisEventEmitter('piyo', true); +reciever.on('redis.user-tl', (data) => { + console.log('recieve a redis event. publish local event'); + const publisher = new XevPubSub('hoge'); + publisher.publish('local.user-tl.user1', data); + publisher.dispose(); +}); + +// 3) ローカルのpub/subにストリーミングTLの内容が流れてくる +const subscriber = new XevPubSub('hoge'); +subscriber.on('message', (channel, message) => { + // 4) WebSocketにポストを投げる + console.log('recieve local event. send by websocket'); + console.log(channel + ':', message); +}); +subscriber.subscribe('local.user-tl.user1'); +subscriber.subscribe('local.user-tl.user2'); + +// in created posting: + +(async () => { + // 1) ポストが投稿される + console.log('post chat. send a redis event'); + + const sender = new RedisEventEmitter('piyo', false); + await sender.emit('redis.user-tl', { text: 'nya' }); + await sender.dispose(); +})(); diff --git a/src/modules/RedisEventEmitter.js b/src/modules/RedisEventEmitter.js new file mode 100644 index 0000000..00cc132 --- /dev/null +++ b/src/modules/RedisEventEmitter.js @@ -0,0 +1,62 @@ +const redis = require('redis'); +const { EventEmitter } = require('events'); +const { promisify } = require('util'); + +class RedisEventEmitter extends EventEmitter { + /** + * @param {string} namespace + * @param {boolean} isReceveMode + * @param {{host: string, port: number}} redisOptions + */ + constructor(namespace, isReceveMode, redisOptions = { host: 'localhost', port: 6379 }) { + super(); + this.namespace = namespace; + this._isReceveMode = isReceveMode; + this._redis = redis.createClient(redisOptions); + this._redis.on('error', (err) => { + throw new Error(`[RedisEventEmitter] ${String(err)}`); + }); + if (this._isReceveMode) { + this._redis.on('message', (namespace, json) => { + let event; + try { + event = JSON.parse(json); + } + catch (err) { + console.warn('recieved redis event is not json format.'); + return; + } + if (event.event == null || event.data == null) { + return; + } + super.emit(event.event, event.data); + }); + this._redis.subscribe(this.namespace, (err) => { + if (err) { + throw new Error('[RedisEventEmitter] failed to subscribe'); + } + }); + } + } + async emit(event, data) { + if (this._isReceveMode) { + throw new Error('emit is disable. this RedisEventEmitter is recieve mode.'); + } + /** @type {(channel: string, value: string) => Promise} */ + const publish = promisify(this._redis.publish).bind(this._redis); + await publish(this.namespace, JSON.stringify({ event, data })); + } + async dispose() { + const quit = promisify(this._redis.quit).bind(this._redis); + const unsubscribe = promisify(this._redis.unsubscribe).bind(this._redis); + if (this._isReceveMode) { + await unsubscribe(this.namespace); + this.removeAllListeners(); + } + if (this._redis.connected) { + await quit(); + } + this._redis.removeAllListeners(); + } +} +module.exports = RedisEventEmitter; diff --git a/src/modules/localStream.js b/src/modules/localStream.js deleted file mode 100644 index c1332c4..0000000 --- a/src/modules/localStream.js +++ /dev/null @@ -1,92 +0,0 @@ -const XevPubSub = require('./XevPubSub'); -const { EventEmitter } = require('events'); -const EventIdHelper = require('./helpers/EventIdHelper'); - -class LocalStream { - /** @param {redis.RedisClient} redisClient */ - constructor() { - this.sources = []; - this.emitter = new EventEmitter(); - this.xev = new XevPubSub('frost-api'); - this.xev.on('message', (channel, message) => { - // 自身のリスナーに対して投げる - this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); - // 設定した別のStreamに投げる - if (this.outgoingStreamId != null) { - this.xev.publish(this.outgoingStreamId, message); - } - }); - } - setDestination(streamId) { - this.outgoingStreamId = streamId; - } - unsetDestination() { - this.outgoingStreamId = null; - } - getSources() { - return this.sources; - } - /** @param {string} streamId */ - addSource(streamId) { - if (this.sources.indexOf(streamId) != -1) { - throw new Error('already added'); - } - this.xev.subscribe(streamId); - this.sources.push(streamId); - } - /** @param {string} streamId */ - removeSource(streamId) { - const index = this.sources.indexOf(streamId); - if (index == -1) { - throw new Error('not exist'); - } - this.xev.unsubscribe(streamId); - this.sources.splice(index, 1); - } - /** - * @param {(data: string | {[x:string]:any})=>void} listener - * @returns listener - */ - addListener(listener) { - this.emitter.addListener('data', listener); - return listener; - } - /** @param {(data: string | {[x:string]:any})=>void} listener */ - removeListener(listener) { - this.emitter.removeListener('data', listener); - } - listenerCount() { - return this.emitter.listenerCount('data'); - } - dispose() { - this.sources.map(i => this.removeSource(i)); - this.xev.dispose(); - this.xev.removeAllListeners(); - this.emitter.removeAllListeners(); - } -} - -class LocalStreamPublisher { - constructor() { - this.xev = new XevPubSub('frost-api'); - } - /** - * @param {string} type - * @param {string} publisherId - * @param {string | {[x:string]:any}} data JSON data or object - */ - publish(type, publisherId, data) { - let strData = (data instanceof String) ? data : JSON.stringify(data); - const streamEventId = EventIdHelper.buildEventId(['stream', type, publisherId]); - this.xev.publish(streamEventId, strData); - } - dispose() { - this.xev.removeAllListeners(); - this.xev.dispose(); - } -} - -module.exports = { - LocalStream, - LocalStreamPublisher -}; diff --git a/src/modules/redisEvent.js b/src/modules/redisEvent.js deleted file mode 100644 index d8f9dff..0000000 --- a/src/modules/redisEvent.js +++ /dev/null @@ -1,115 +0,0 @@ -const redis = require('redis'); -const { EventEmitter } = require('events'); - -class RedisEventReciever { - /** @param {redis.RedisClient} redisClient */ - constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) { - this.namespace = namespace; - this.emitter = new EventEmitter(); - this.redis = redis.createClient(redisOptions); - - this.redis.on('message', (channel, message) => { - // 自身のリスナーに対して投げる - this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); - }); - this.redis.on('error', (err) => { - throw new Error(`redis(reciever): ${String(err)}`); - }); - - this.redis.subscribe(this.namespace, (err) => { - if (err) { - throw new Error('redis: failed to subscribe'); - } - }); - } - /** - * @param {(data: string | {[x:string]:any})=>void} listener - * @returns listener - */ - addListener(listener) { - this.emitter.addListener('data', listener); - return listener; - } - /** @param {(data: string | {[x:string]:any})=>void} listener */ - removeListener(listener) { - this.emitter.removeListener('data', listener); - } - listenerCount() { - return this.emitter.listenerCount('data'); - } - /** @returns {Promise} */ - async dispose() { - const disposeRedis = () => new Promise((resolve, reject) => { - if (this.redis.connected) { - this.redis.quit((err) => { - if (err) { - return reject(err); - } - resolve(); - }); - } - }); - const unsubscribe = () => new Promise((resolve, reject) => { - this.redis.unsubscribe(this.namespace, (err) => { - if (err) { - return reject(err); - } - resolve(); - }); - }); - - await unsubscribe(); - await disposeRedis(); - this.redis.removeAllListeners(); - this.emitter.removeAllListeners(); - } -} - -class RedisEventSender { - constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) { - this.namespace = namespace; - this.redis = redis.createClient(redisOptions); - this.redis.on('error', (err) => { - throw new Error(`redis(sender): ${String(err)}`); - }); - } - /** - * @param {string} type - * @param {string} publisherId - * @param {string | {[x:string]:any}} data JSON data or object - */ - publish(eventId, data) { - return new Promise((resolve, reject) => { - let strData = JSON.stringify({ eventId, data }); - this.redis.publish(this.namespace, strData, (err) => { - if (err) { - return reject(err); - } - resolve(); - }); - }); - } - async dispose() { - const dispose = () => new Promise((resolve, reject) => { - if (this.redis.connected) { - this.redis.quit((err) => { - if (err) { - return reject(err); - } - resolve(); - }); - } - else { - resolve(); - } - }); - - await dispose(); - this.redis.removeAllListeners(); - } -} - -module.exports = { - RedisEventReciever, - RedisEventSender -};