From fb9153c9c755b49346768544f737937c036e0b67 Mon Sep 17 00:00:00 2001 From: marihachi Date: Tue, 16 Oct 2018 23:05:39 +0900 Subject: [PATCH] update --- src/modules/helpers/EventIdHelper.js | 30 ++ src/modules/localStream.js | 92 ++++++ src/modules/redisEvent.js | 115 ++++++++ src/modules/stream.js | 297 -------------------- src/routes/posts/post_status.js | 16 +- src/routes/users/id/followings/target_id.js | 29 +- src/streamingServer.js | 65 ++++- 7 files changed, 314 insertions(+), 330 deletions(-) create mode 100644 src/modules/helpers/EventIdHelper.js create mode 100644 src/modules/localStream.js create mode 100644 src/modules/redisEvent.js delete mode 100644 src/modules/stream.js diff --git a/src/modules/helpers/EventIdHelper.js b/src/modules/helpers/EventIdHelper.js new file mode 100644 index 0000000..7789e5f --- /dev/null +++ b/src/modules/helpers/EventIdHelper.js @@ -0,0 +1,30 @@ +class EventIdHelper { + /** + * @param {string} eventType + * @param {string[]} params + */ + static buildEventId(params) { + return params.join('.'); + } + /** @param {string} eventId */ + static parseEventId(eventId) { + return eventId.split('.'); + } + /** + * @param {string} eventId + * @param {string[]} partialParams + */ + static contain(eventId, partialParams) { + const params = EventIdUtil.parseEventId(eventId); + if (partialParams > params) { + return false; + } + for (let i = 0; i < partialParams.length; i++) { + if (partialParams[i] != params[i]) { + return false; + } + } + return true; + } +} +module.exports = EventIdHelper; diff --git a/src/modules/localStream.js b/src/modules/localStream.js new file mode 100644 index 0000000..c1332c4 --- /dev/null +++ b/src/modules/localStream.js @@ -0,0 +1,92 @@ +const XevPubSub = require('./XevPubSub'); +const { EventEmitter } = require('events'); +const EventIdHelper = require('./helpers/EventIdHelper'); + +class LocalStream { + /** @param {redis.RedisClient} redisClient */ + constructor() { + this.sources = []; + this.emitter = new EventEmitter(); + this.xev = new XevPubSub('frost-api'); + this.xev.on('message', (channel, message) => { + // 自身のリスナーに対して投げる + this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); + // 設定した別のStreamに投げる + if (this.outgoingStreamId != null) { + this.xev.publish(this.outgoingStreamId, message); + } + }); + } + setDestination(streamId) { + this.outgoingStreamId = streamId; + } + unsetDestination() { + this.outgoingStreamId = null; + } + getSources() { + return this.sources; + } + /** @param {string} streamId */ + addSource(streamId) { + if (this.sources.indexOf(streamId) != -1) { + throw new Error('already added'); + } + this.xev.subscribe(streamId); + this.sources.push(streamId); + } + /** @param {string} streamId */ + removeSource(streamId) { + const index = this.sources.indexOf(streamId); + if (index == -1) { + throw new Error('not exist'); + } + this.xev.unsubscribe(streamId); + this.sources.splice(index, 1); + } + /** + * @param {(data: string | {[x:string]:any})=>void} listener + * @returns listener + */ + addListener(listener) { + this.emitter.addListener('data', listener); + return listener; + } + /** @param {(data: string | {[x:string]:any})=>void} listener */ + removeListener(listener) { + this.emitter.removeListener('data', listener); + } + listenerCount() { + return this.emitter.listenerCount('data'); + } + dispose() { + this.sources.map(i => this.removeSource(i)); + this.xev.dispose(); + this.xev.removeAllListeners(); + this.emitter.removeAllListeners(); + } +} + +class LocalStreamPublisher { + constructor() { + this.xev = new XevPubSub('frost-api'); + } + /** + * @param {string} type + * @param {string} publisherId + * @param {string | {[x:string]:any}} data JSON data or object + */ + publish(type, publisherId, data) { + let strData = (data instanceof String) ? data : JSON.stringify(data); + const streamEventId = EventIdHelper.buildEventId(['stream', type, publisherId]); + this.xev.publish(streamEventId, strData); + } + dispose() { + this.xev.removeAllListeners(); + this.xev.dispose(); + } +} + +module.exports = { + LocalStream, + LocalStreamPublisher +}; diff --git a/src/modules/redisEvent.js b/src/modules/redisEvent.js new file mode 100644 index 0000000..d8f9dff --- /dev/null +++ b/src/modules/redisEvent.js @@ -0,0 +1,115 @@ +const redis = require('redis'); +const { EventEmitter } = require('events'); + +class RedisEventReciever { + /** @param {redis.RedisClient} redisClient */ + constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) { + this.namespace = namespace; + this.emitter = new EventEmitter(); + this.redis = redis.createClient(redisOptions); + + this.redis.on('message', (channel, message) => { + // 自身のリスナーに対して投げる + this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); + }); + this.redis.on('error', (err) => { + throw new Error(`redis(reciever): ${String(err)}`); + }); + + this.redis.subscribe(this.namespace, (err) => { + if (err) { + throw new Error('redis: failed to subscribe'); + } + }); + } + /** + * @param {(data: string | {[x:string]:any})=>void} listener + * @returns listener + */ + addListener(listener) { + this.emitter.addListener('data', listener); + return listener; + } + /** @param {(data: string | {[x:string]:any})=>void} listener */ + removeListener(listener) { + this.emitter.removeListener('data', listener); + } + listenerCount() { + return this.emitter.listenerCount('data'); + } + /** @returns {Promise} */ + async dispose() { + const disposeRedis = () => new Promise((resolve, reject) => { + if (this.redis.connected) { + this.redis.quit((err) => { + if (err) { + return reject(err); + } + resolve(); + }); + } + }); + const unsubscribe = () => new Promise((resolve, reject) => { + this.redis.unsubscribe(this.namespace, (err) => { + if (err) { + return reject(err); + } + resolve(); + }); + }); + + await unsubscribe(); + await disposeRedis(); + this.redis.removeAllListeners(); + this.emitter.removeAllListeners(); + } +} + +class RedisEventSender { + constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) { + this.namespace = namespace; + this.redis = redis.createClient(redisOptions); + this.redis.on('error', (err) => { + throw new Error(`redis(sender): ${String(err)}`); + }); + } + /** + * @param {string} type + * @param {string} publisherId + * @param {string | {[x:string]:any}} data JSON data or object + */ + publish(eventId, data) { + return new Promise((resolve, reject) => { + let strData = JSON.stringify({ eventId, data }); + this.redis.publish(this.namespace, strData, (err) => { + if (err) { + return reject(err); + } + resolve(); + }); + }); + } + async dispose() { + const dispose = () => new Promise((resolve, reject) => { + if (this.redis.connected) { + this.redis.quit((err) => { + if (err) { + return reject(err); + } + resolve(); + }); + } + else { + resolve(); + } + }); + + await dispose(); + this.redis.removeAllListeners(); + } +} + +module.exports = { + RedisEventReciever, + RedisEventSender +}; diff --git a/src/modules/stream.js b/src/modules/stream.js deleted file mode 100644 index 38ef880..0000000 --- a/src/modules/stream.js +++ /dev/null @@ -1,297 +0,0 @@ -const XevPubSub = require('./XevPubSub'); -const redis = require('redis'); -const { EventEmitter } = require('events'); - -class EventIdUtil { - /** - * @param {string} eventType - * @param {string[]} params - */ - static buildEventId(eventType, params) { - return `${eventType}:${params.join(':')}`; - } - /** @param {string} eventId */ - static parseEventId(eventId) { - const elements = eventId.split(':'); - const appendedParams = []; - if (elements.length >= 2) { - appendedParams.push(...elements.slice(1)); - } - return { - eventType: elements[0], - appendedParams: appendedParams - }; - } -} - -class StreamEventIdUtil { - /** - * @param {string} streamType - * @param {string} publisherId - */ - static buildStreamEventId(streamType, publisherId) { - return EventIdUtil.buildEventId('stream', [streamType, publisherId]); - } - /** @param {string} eventId */ - static parseStreamEventId(eventId) { - const elements = EventIdUtil.parseEventId(eventId); - if (elements.eventType != 'stream') { - throw new Error('eventType is not stream'); - } - if (elements.appendedParams.length != 2) { - throw new Error('length of appendedParams is invalid'); - } - return { - streamType: elements.appendedParams[0], - publisherId: elements.appendedParams[1] - }; - } -} - -class XevStream { - /** @param {redis.RedisClient} redisClient */ - constructor() { - this.sources = []; - this.emitter = new EventEmitter(); - this.xev = new XevPubSub('frost-api'); - this.xev.on('message', (channel, message) => { - // 自身のリスナーに対して投げる - this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); - // 設定した別のStreamに投げる - if (this.outgoingStreamId != null) { - this.xev.publish(this.outgoingStreamId, message); - } - }); - } - setDestination(streamId) { - this.outgoingStreamId = streamId; - } - unsetDestination() { - this.outgoingStreamId = null; - } - getSources() { - return this.sources; - } - /** @param {string} streamId */ - addSource(streamId) { - if (this.sources.indexOf(streamId) != -1) { - throw new Error('already added'); - } - this.xev.subscribe(streamId); - this.sources.push(streamId); - } - /** @param {string} streamId */ - removeSource(streamId) { - const index = this.sources.indexOf(streamId); - if (index == -1) { - throw new Error('not exist'); - } - this.xev.unsubscribe(streamId); - this.sources.splice(index, 1); - } - /** - * @param {(data: string | {[x:string]:any})=>void} listener - * @returns listener - */ - addListener(listener) { - this.emitter.addListener('data', listener); - return listener; - } - /** @param {(data: string | {[x:string]:any})=>void} listener */ - removeListener(listener) { - this.emitter.removeListener('data', listener); - } - listenerCount() { - return this.emitter.listenerCount('data'); - } - dispose() { - this.sources.map(i => this.removeSource(i)); - this.xev.dispose(); - this.xev.removeAllListeners(); - this.emitter.removeAllListeners(); - } -} - -class XevStreamPublisher { - constructor() { - this.xev = new XevPubSub('frost-api'); - } - /** - * @param {string} type - * @param {string} publisherId - * @param {string | {[x:string]:any}} data JSON data or object - */ - publish(type, publisherId, data) { - let strData = (data instanceof String) ? data : JSON.stringify(data); - const streamEventId = StreamEventIdUtil.buildStreamEventId(type, publisherId); - this.xev.publish(streamEventId, strData); - } - dispose() { - this.xev.removeAllListeners(); - this.xev.dispose(); - } -} - -class RedisStream { - /** @param {redis.RedisClient} redisClient */ - constructor(redisOptions = { host: 'localhost', port: 6379 }) { - this.sources = []; - this.emitter = new EventEmitter(); - this.redis = { - sub: redis.createClient(redisOptions), - pub: redis.createClient(redisOptions) - }; - this.redis.sub.on('message', (channel, message) => { - // 自身のリスナーに対して投げる - this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message)); - // 設定した別のStreamに投げる - if (this.outgoingStreamId != null) { - this.redis.pub.publish(this.outgoingStreamId, message); - } - }); - this.redis.sub.on('error', (err) => { - throw new Error(`redis(sub): ${String(err)}`); - }); - this.redis.pub.on('error', (err) => { - throw new Error(`redis(pub): ${String(err)}`); - }); - } - setDestination(streamId) { - this.outgoingStreamId = streamId; - } - unsetDestination() { - this.outgoingStreamId = null; - } - getSources() { - return this.sources; - } - /** @param {string} streamId */ - addSource(streamId) { - return new Promise((resolve, reject) => { - if (this.sources.indexOf(streamId) != -1) { - throw new Error('already added'); - } - this.redis.sub.subscribe(streamId, (err) => { - if (err) { - return reject(err); - } - this.sources.push(streamId); - resolve(); - }); - }); - } - /** @param {string} streamId */ - removeSource(streamId) { - return new Promise((resolve, reject) => { - const index = this.sources.indexOf(streamId); - if (index == -1) { - throw new Error('not exist'); - } - this.redis.sub.unsubscribe(streamId, (err) => { - if (err) { - return reject(err); - } - this.sources.splice(index, 1); - resolve(); - }); - }); - } - /** - * @param {(data: string | {[x:string]:any})=>void} listener - * @returns listener - */ - addListener(listener) { - this.emitter.addListener('data', listener); - return listener; - } - /** @param {(data: string | {[x:string]:any})=>void} listener */ - removeListener(listener) { - this.emitter.removeListener('data', listener); - } - listenerCount() { - return this.emitter.listenerCount('data'); - } - /** @returns {Promise} */ - async dispose() { - const disposeRedisSub = () => new Promise((resolve, reject) => { - if (this.redis.sub.connected) { - this.redis.sub.quit((err) => { - if (err) { - return reject(err); - } - resolve(); - }); - } - }); - const disposeRedisPub = () => new Promise((resolve, reject) => { - if (this.redis.pub.connected) { - this.redis.pub.quit((err) => { - if (err) { - return reject(err); - } - resolve(); - }); - } - }); - - await Promise.all(this.sources.map(i => this.removeSource(i))); - await disposeRedisSub(); - await disposeRedisPub(); - this.redis.sub.removeAllListeners(); - this.redis.pub.removeAllListeners(); - this.emitter.removeAllListeners(); - } -} - -class RedisStreamPublisher { - constructor(redisOptions = { host: 'localhost', port: 6379 }) { - this.redisPub = redis.createClient(redisOptions); - this.redisPub.on('error', (err) => { - throw new Error(`stream(pub): ${String(err)}`); - }); - } - /** - * @param {string} type - * @param {string} publisherId - * @param {string | {[x:string]:any}} data JSON data or object - */ - publish(type, publisherId, data) { - return new Promise((resolve, reject) => { - let strData = (data instanceof String) ? data : JSON.stringify(data); - const streamEventId = StreamEventIdUtil.buildStreamEventId(type, publisherId); - this.redisPub.publish(streamEventId, strData, (err) => { - if (err) { - return reject(err); - } - resolve(); - }); - }); - } - async dispose() { - const dispose = () => new Promise((resolve, reject) => { - if (this.redisPub.connected) { - this.redisPub.quit((err) => { - if (err) { - return reject(err); - } - resolve(); - }); - } - else { - resolve(); - } - }); - - await dispose(); - this.redisPub.removeAllListeners(); - } -} - -module.exports = { - EventIdUtil, - StreamEventIdUtil, - XevStream, - XevStreamPublisher, - RedisStream, - RedisStreamPublisher -}; diff --git a/src/routes/posts/post_status.js b/src/routes/posts/post_status.js index ec9cc63..bdef3a6 100644 --- a/src/routes/posts/post_status.js +++ b/src/routes/posts/post_status.js @@ -1,5 +1,6 @@ const ApiContext = require('../../modules/ApiContext'); -const { XevStreamPublisher : StreamPublisher } = require('../../modules/stream'); +const EventIdHelper = require('../../modules/helpers/EventIdHelper'); +const { RedisEventSender } = require('../../modules/redisEvent'); const $ = require('cafy').default; const MongoAdapter = require('../../modules/MongoAdapter'); @@ -33,13 +34,12 @@ exports.post = async (apiContext) => { const serializedPostStatus = await apiContext.postsService.serialize(postStatus, true); - // 各種ストリームに発行 - const publisher = new StreamPublisher(); - await Promise.all([ - publisher.publish('user-timeline-status', apiContext.user._id.toString(), serializedPostStatus), - publisher.publish('general-timeline-status', 'general', serializedPostStatus) - ]); - await publisher.dispose(); + // event.posting.chat を発行 + const eventSender = new RedisEventSender('frost-api'); + await eventSender.publish(EventIdHelper.buildEventId(['event', 'posting', 'chat']), { + posting: postStatus + }); + await eventSender.dispose(); apiContext.response(200, { postStatus: serializedPostStatus }); }; diff --git a/src/routes/users/id/followings/target_id.js b/src/routes/users/id/followings/target_id.js index 1490a62..8d24e87 100644 --- a/src/routes/users/id/followings/target_id.js +++ b/src/routes/users/id/followings/target_id.js @@ -1,5 +1,6 @@ const ApiContext = require('../../../../modules/ApiContext'); -const { StreamEventIdUtil } = require('../../../../modules/stream'); +const { RedisEventSender } = require('../../../../modules/redisEvent'); +const EventIdHelper = require('../../../../modules/helpers/EventIdHelper'); // const $ = require('cafy').default; /** @param {ApiContext} apiContext */ @@ -88,11 +89,14 @@ exports.put = async (apiContext) => { return; } - // 対象ユーザーのストリームを購読 - const stream = apiContext.streams.get(StreamEventIdUtil.buildStreamEventId('user-timeline-status', sourceUserId.toString())); - if (stream != null) { - stream.addSource(targetUserId.toString()); // この操作は冪等 - } + // event.following を発行 + const eventSender = new RedisEventSender('frost-api'); + await eventSender.publish(EventIdHelper.buildEventId(['event', 'following']), { + following: true, + sourceId: sourceUserId, + targetId: targetUserId + }); + await eventSender.dispose(); apiContext.response(200, 'following'); }; @@ -130,11 +134,14 @@ exports.delete = async (apiContext) => { // ignore } - // 対象ユーザーのストリームを購読解除 - const stream = apiContext.streams.get(StreamEventIdUtil.buildStreamEventId('user-timeline-status', soruceUser._id.toString())); - if (stream != null) { - stream.removeSource(targetUser._id.toString()); - } + // event.following を発行 + const eventSender = new RedisEventSender('frost-api'); + await eventSender.publish(EventIdHelper.buildEventId(['event', 'following']), { + following: false, + sourceUserId, + targetUserId + }); + await eventSender.dispose(); apiContext.response(200, { following: false }); }; diff --git a/src/streamingServer.js b/src/streamingServer.js index 83e124d..fe597b0 100644 --- a/src/streamingServer.js +++ b/src/streamingServer.js @@ -1,7 +1,9 @@ const WebSocket = require('websocket'); const events = require('websocket-events'); const MongoAdapter = require('./modules/MongoAdapter'); -const { XevStream : Stream, StreamEventIdUtil } = require('./modules/stream'); +const EventIdHelper = require('./modules/helpers/EventIdHelper'); +const { LocalStream, LocalStreamPublisher } = require('./modules/localStream'); +const { RedisEventReciever } = require('./modules/redisEvent'); const methods = require('methods'); const ApiContext = require('./modules/ApiContext'); const TokensService = require('./services/TokensService'); @@ -12,26 +14,61 @@ const sanitize = require('mongo-sanitize'); # 各種変数の説明 streamType: 'user-timeline-status' | 'home-timeline-status' | 'general-timeline-status' streamPublisher: ストリームの発行者情報 -streamId : StreamEventIdUtil.buildStreamEventId(streamType, streamPublisher) ストリームの識別子 -streams: Map 全てのストリーム一覧 +streamId: EventIdHelper.buildEventId(['stream', streamType, streamPublisher]) ストリームの識別子 +streams: Map 全てのストリーム一覧 connectedStreamIds: streamId[] 接続済みのストリーム名一覧 # streamIdの例 -general-timeline-status:general generalに向けて流されたポストを受信可能なStreamです -home-timeline-status:(userId) そのユーザーのホームTLに向けて流されたポストを受信可能なStreamです +general-timeline-status:general generalに向けて流されたポストを受信可能なLocalStreamです +home-timeline-status:(userId) そのユーザーのホームTLに向けて流されたポストを受信可能なLocalStreamです +*/ + +/* + +// イベント受信とその処理を追加: event.following.user follow +const eventReciever = new RedisEventReciever('frost-api'); +eventReciever.addListener((data) => { + // 対象ユーザーのストリームを購読 + const stream = apiContext.streams.get(EventIdHelper.buildEventId(['stream', 'user-timeline-status', sourceUserId.toString()])); + if (stream != null) { + stream.addSource(targetUserId.toString()); // この操作は冪等 + } +}); + +// イベント受信とその処理を追加: event.following.user unfollow +const eventReciever = new RedisEventReciever('frost-api'); +eventReciever.addListener((data) => { + // 対象ユーザーのストリームを購読解除 + const stream = apiContext.streams.get(EventIdUtil.buildEventId(['stream', 'user-timeline-status', soruceUser._id.toString()])); + if (stream != null) { + stream.removeSource(targetUser._id.toString()); + } +}); + + + + + // 各種ストリームに発行 + const publisher = new LocalStreamPublisher(); + await Promise.all([ + publisher.publish('user-timeline-status', apiContext.user._id.toString(), serializedPostStatus), + publisher.publish('general-timeline-status', 'general', serializedPostStatus) + ]); + await publisher.dispose(); + */ /** - * @param {Map} streams + * @param {Map} streams * @param {MongoAdapter} repository */ module.exports = (http, directoryRouter, streams, repository, config) => { const server = new WebSocket.server({ httpServer: http }); // generate stream for general timeline (global) - const generalTLStream = new Stream(); + const generalTLStream = new LocalStream(); const generalTLStreamType = 'general-timeline-status'; - const generalTLStreamId = StreamEventIdUtil.buildStreamEventId(generalTLStreamType, 'general'); + const generalTLStreamId = EventIdHelper.buildEventId(['stream', generalTLStreamType, 'general']); generalTLStream.addSource(generalTLStreamId); streams.set(generalTLStreamId, generalTLStream); @@ -55,7 +92,7 @@ module.exports = (http, directoryRouter, streams, repository, config) => { if (stream.listenerCount() == 0) { // general-timeline-statusはストリーム自体の解放は行わない - const { streamType } = StreamEventIdUtil.parseStreamEventId(streamId); + const { streamType } = EventIdHelper.parseEventId(streamId); if (streamType == 'general-timeline-status') { return; } @@ -202,7 +239,7 @@ module.exports = (http, directoryRouter, streams, repository, config) => { else if (timelineType == 'home') { // memo: フォローユーザーのuser-timeline-statusストリームを統合したhome-timeline-statusストリームを生成 streamType = 'home-timeline-status'; - streamId = StreamEventIdUtil.buildStreamEventId(streamType, connection.user._id); + streamId = EventIdHelper.buildEventId(['stream', streamType, connection.user._id]); // expect: Not connected to the stream yet from this connection. if (connection.connectedStreamIds.indexOf(streamId) != -1) { @@ -213,11 +250,11 @@ module.exports = (http, directoryRouter, streams, repository, config) => { if (stream == null) { // ストリームを生成 stream = new Stream(); - stream.addSource(StreamEventIdUtil.buildStreamEventId('user-timeline-status', connection.user._id)); + stream.addSource(EventIdHelper.buildEventId(['stream', 'user-timeline-status', connection.user._id])); const followings = await userFollowingsService.findTargets(connection.user._id, { isAscending: false }); // TODO: (全て or ユーザーの購読設定によっては選択的に) for (const following of followings || []) { const followingUserId = following.target.toString(); - stream.addSource(StreamEventIdUtil.buildStreamEventId('user-timeline-status', followingUserId)); + stream.addSource(EventIdHelper.buildEventId(['stream', 'user-timeline-status', followingUserId])); } streams.set(streamId, stream); } @@ -226,7 +263,7 @@ module.exports = (http, directoryRouter, streams, repository, config) => { return connection.error('timeline-connect', `timeline type "${timelineType}" is invalid`); } - // ストリームからのデータをwebsocketに流す + // LocalStreamからのデータをwebsocketに流す const streamHandler = stream.addListener(data => { if (connection.connected) { console.log(`streaming/stream:${streamType}`); @@ -264,7 +301,7 @@ module.exports = (http, directoryRouter, streams, repository, config) => { streamId = generalTLStreamId; } else if (timelineType == 'home') { - streamId = StreamEventIdUtil.buildStreamEventId('home-timeline-status', connection.user._id); + streamId = EventIdHelper.buildEventId(['stream', 'home-timeline-status', connection.user._id]); } else { return connection.error('timeline-disconnect', `timeline type "${timelineType}" is invalid`);