Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
update event system
Browse files Browse the repository at this point in the history
  • Loading branch information
marihachi committed Oct 17, 2018
1 parent fb9153c commit 01729c8
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 207 deletions.
34 changes: 34 additions & 0 deletions demoRedisEventXevPubSub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const RedisEventEmitter = require('./src/modules/RedisEventEmitter');
const XevPubSub = require('./src/modules/XevPubSub');

// in streaming-server:

// 2) redisから流れてきた広域のイベントを、ローカルのpub/subにストリーミングTLの内容として投げる
const reciever = new RedisEventEmitter('piyo', true);
reciever.on('redis.user-tl', (data) => {
console.log('recieve a redis event. publish local event');
const publisher = new XevPubSub('hoge');
publisher.publish('local.user-tl.user1', data);
publisher.dispose();
});

// 3) ローカルのpub/subにストリーミングTLの内容が流れてくる
const subscriber = new XevPubSub('hoge');
subscriber.on('message', (channel, message) => {
// 4) WebSocketにポストを投げる
console.log('recieve local event. send by websocket');
console.log(channel + ':', message);
});
subscriber.subscribe('local.user-tl.user1');
subscriber.subscribe('local.user-tl.user2');

// in created posting:

(async () => {
// 1) ポストが投稿される
console.log('post chat. send a redis event');

const sender = new RedisEventEmitter('piyo', false);
await sender.emit('redis.user-tl', { text: 'nya' });
await sender.dispose();
})();
62 changes: 62 additions & 0 deletions src/modules/RedisEventEmitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const redis = require('redis');
const { EventEmitter } = require('events');
const { promisify } = require('util');

class RedisEventEmitter extends EventEmitter {
/**
* @param {string} namespace
* @param {boolean} isReceveMode
* @param {{host: string, port: number}} redisOptions
*/
constructor(namespace, isReceveMode, redisOptions = { host: 'localhost', port: 6379 }) {
super();
this.namespace = namespace;
this._isReceveMode = isReceveMode;
this._redis = redis.createClient(redisOptions);
this._redis.on('error', (err) => {
throw new Error(`[RedisEventEmitter] ${String(err)}`);
});
if (this._isReceveMode) {
this._redis.on('message', (namespace, json) => {
let event;
try {
event = JSON.parse(json);
}
catch (err) {
console.warn('recieved redis event is not json format.');
return;
}
if (event.event == null || event.data == null) {
return;
}
super.emit(event.event, event.data);
});
this._redis.subscribe(this.namespace, (err) => {
if (err) {
throw new Error('[RedisEventEmitter] failed to subscribe');
}
});
}
}
async emit(event, data) {
if (this._isReceveMode) {
throw new Error('emit is disable. this RedisEventEmitter is recieve mode.');
}
/** @type {(channel: string, value: string) => Promise<boolean>} */
const publish = promisify(this._redis.publish).bind(this._redis);
await publish(this.namespace, JSON.stringify({ event, data }));
}
async dispose() {
const quit = promisify(this._redis.quit).bind(this._redis);
const unsubscribe = promisify(this._redis.unsubscribe).bind(this._redis);
if (this._isReceveMode) {
await unsubscribe(this.namespace);
this.removeAllListeners();
}
if (this._redis.connected) {
await quit();
}
this._redis.removeAllListeners();
}
}
module.exports = RedisEventEmitter;
92 changes: 0 additions & 92 deletions src/modules/localStream.js

This file was deleted.

115 changes: 0 additions & 115 deletions src/modules/redisEvent.js

This file was deleted.

0 comments on commit 01729c8

Please sign in to comment.