Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
marihachi committed Oct 16, 2018
1 parent 83d1630 commit fb9153c
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 330 deletions.
30 changes: 30 additions & 0 deletions src/modules/helpers/EventIdHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class EventIdHelper {
/**
* @param {string} eventType
* @param {string[]} params
*/
static buildEventId(params) {
return params.join('.');
}
/** @param {string} eventId */
static parseEventId(eventId) {
return eventId.split('.');
}
/**
* @param {string} eventId
* @param {string[]} partialParams
*/
static contain(eventId, partialParams) {
const params = EventIdUtil.parseEventId(eventId);
if (partialParams > params) {
return false;
}
for (let i = 0; i < partialParams.length; i++) {
if (partialParams[i] != params[i]) {
return false;
}
}
return true;
}
}
module.exports = EventIdHelper;
92 changes: 92 additions & 0 deletions src/modules/localStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
const XevPubSub = require('./XevPubSub');
const { EventEmitter } = require('events');
const EventIdHelper = require('./helpers/EventIdHelper');

class LocalStream {
/** @param {redis.RedisClient} redisClient */
constructor() {
this.sources = [];
this.emitter = new EventEmitter();
this.xev = new XevPubSub('frost-api');
this.xev.on('message', (channel, message) => {
// 自身のリスナーに対して投げる
this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message));
// 設定した別のStreamに投げる
if (this.outgoingStreamId != null) {
this.xev.publish(this.outgoingStreamId, message);
}
});
}
setDestination(streamId) {
this.outgoingStreamId = streamId;
}
unsetDestination() {
this.outgoingStreamId = null;
}
getSources() {
return this.sources;
}
/** @param {string} streamId */
addSource(streamId) {
if (this.sources.indexOf(streamId) != -1) {
throw new Error('already added');
}
this.xev.subscribe(streamId);
this.sources.push(streamId);
}
/** @param {string} streamId */
removeSource(streamId) {
const index = this.sources.indexOf(streamId);
if (index == -1) {
throw new Error('not exist');
}
this.xev.unsubscribe(streamId);
this.sources.splice(index, 1);
}
/**
* @param {(data: string | {[x:string]:any})=>void} listener
* @returns listener
*/
addListener(listener) {
this.emitter.addListener('data', listener);
return listener;
}
/** @param {(data: string | {[x:string]:any})=>void} listener */
removeListener(listener) {
this.emitter.removeListener('data', listener);
}
listenerCount() {
return this.emitter.listenerCount('data');
}
dispose() {
this.sources.map(i => this.removeSource(i));
this.xev.dispose();
this.xev.removeAllListeners();
this.emitter.removeAllListeners();
}
}

class LocalStreamPublisher {
constructor() {
this.xev = new XevPubSub('frost-api');
}
/**
* @param {string} type
* @param {string} publisherId
* @param {string | {[x:string]:any}} data JSON data or object
*/
publish(type, publisherId, data) {
let strData = (data instanceof String) ? data : JSON.stringify(data);
const streamEventId = EventIdHelper.buildEventId(['stream', type, publisherId]);
this.xev.publish(streamEventId, strData);
}
dispose() {
this.xev.removeAllListeners();
this.xev.dispose();
}
}

module.exports = {
LocalStream,
LocalStreamPublisher
};
115 changes: 115 additions & 0 deletions src/modules/redisEvent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
const redis = require('redis');
const { EventEmitter } = require('events');

class RedisEventReciever {
/** @param {redis.RedisClient} redisClient */
constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) {
this.namespace = namespace;
this.emitter = new EventEmitter();
this.redis = redis.createClient(redisOptions);

this.redis.on('message', (channel, message) => {
// 自身のリスナーに対して投げる
this.emitter.emit('data', (message instanceof String) ? message : JSON.parse(message));
});
this.redis.on('error', (err) => {
throw new Error(`redis(reciever): ${String(err)}`);
});

this.redis.subscribe(this.namespace, (err) => {
if (err) {
throw new Error('redis: failed to subscribe');
}
});
}
/**
* @param {(data: string | {[x:string]:any})=>void} listener
* @returns listener
*/
addListener(listener) {
this.emitter.addListener('data', listener);
return listener;
}
/** @param {(data: string | {[x:string]:any})=>void} listener */
removeListener(listener) {
this.emitter.removeListener('data', listener);
}
listenerCount() {
return this.emitter.listenerCount('data');
}
/** @returns {Promise<void>} */
async dispose() {
const disposeRedis = () => new Promise((resolve, reject) => {
if (this.redis.connected) {
this.redis.quit((err) => {
if (err) {
return reject(err);
}
resolve();
});
}
});
const unsubscribe = () => new Promise((resolve, reject) => {
this.redis.unsubscribe(this.namespace, (err) => {
if (err) {
return reject(err);
}
resolve();
});
});

await unsubscribe();
await disposeRedis();
this.redis.removeAllListeners();
this.emitter.removeAllListeners();
}
}

class RedisEventSender {
constructor(namespace, redisOptions = { host: 'localhost', port: 6379 }) {
this.namespace = namespace;
this.redis = redis.createClient(redisOptions);
this.redis.on('error', (err) => {
throw new Error(`redis(sender): ${String(err)}`);
});
}
/**
* @param {string} type
* @param {string} publisherId
* @param {string | {[x:string]:any}} data JSON data or object
*/
publish(eventId, data) {
return new Promise((resolve, reject) => {
let strData = JSON.stringify({ eventId, data });
this.redis.publish(this.namespace, strData, (err) => {
if (err) {
return reject(err);
}
resolve();
});
});
}
async dispose() {
const dispose = () => new Promise((resolve, reject) => {
if (this.redis.connected) {
this.redis.quit((err) => {
if (err) {
return reject(err);
}
resolve();
});
}
else {
resolve();
}
});

await dispose();
this.redis.removeAllListeners();
}
}

module.exports = {
RedisEventReciever,
RedisEventSender
};
Loading

0 comments on commit fb9153c

Please sign in to comment.