diff --git a/index.js b/index.js index 2553806..15b0ad6 100644 --- a/index.js +++ b/index.js @@ -12,17 +12,15 @@ function RedisPubSub(options) { options || (options = {}); this.client = options.client || redis.createClient(options); + this._clientConnection = null; // Redis doesn't allow the same connection to both listen to channels and do // operations. Make an extra redis connection for subscribing with the same // options if not provided this.observer = options.observer || redis.createClient(this.client.options); + this._observerConnection = null; - var pubsub = this; - this.observer.on('message', function(channel, message) { - var data = JSON.parse(message); - pubsub._emit(channel, data); - }); + this._connect(); } module.exports = RedisPubSub; @@ -37,27 +35,59 @@ RedisPubSub.prototype.close = function(callback) { var pubsub = this; PubSub.prototype.close.call(this, function(err) { if (err) return callback(err); - pubsub.client.quit(function(err) { - if (err) return callback(err); - pubsub.observer.quit(callback); - }); + pubsub._close().then(function() { + callback(); + }, callback); }); }; +RedisPubSub.prototype._close = function() { + return this._closing = this._closing || this._connect().then(Promise.all([ + this.client.quit(), + this.observer.quit() + ])); +}; + RedisPubSub.prototype._subscribe = function(channel, callback) { - this.observer.subscribe(channel, callback); + var pubsub = this; + pubsub.observer + .subscribe(channel, function(message) { + var data = JSON.parse(message); + pubsub._emit(channel, data); + }) + .then(function() { + callback(); + }, callback); }; RedisPubSub.prototype._unsubscribe = function(channel, callback) { - this.observer.unsubscribe(channel, callback); + this.observer.unsubscribe(channel) + .then(function() { + callback(); + }, callback); }; RedisPubSub.prototype._publish = function(channels, data, callback) { var message = JSON.stringify(data); - var args = [PUBLISH_SCRIPT, 0, message].concat(channels); - this.client.eval(args, callback); + var args = [message].concat(channels); + this.client.eval(PUBLISH_SCRIPT, {arguments: args}).then(function() { + callback(); + }, callback); }; +RedisPubSub.prototype._connect = function() { + this._clientConnection = this._clientConnection || connect(this.client); + this._observerConnection = this._observerConnection || connect(this.observer); + return Promise.all([ + this._clientConnection, + this._observerConnection + ]); +}; + +function connect(client) { + return client.isOpen ? Promise.resolve() : client.connect(); +} + var PUBLISH_SCRIPT = 'for i = 2, #ARGV do ' + 'redis.call("publish", ARGV[i], ARGV[1]) ' + diff --git a/package.json b/package.json index f4cded0..f669393 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { "name": "sharedb-redis-pubsub", - "version": "4.0.0", + "version": "5.0.0", "description": "Redis pub/sub adapter adapter for ShareDB", "main": "index.js", "dependencies": { - "redis": "^2.6.0 || ^3.0.0", + "redis": "^4.0.0", "sharedb": "^1.0.0 || ^2.0.0 || ^3.0.0 || ^4.0.0" }, "devDependencies": { diff --git a/test/test.js b/test/test.js index faf2d02..53a3891 100644 --- a/test/test.js +++ b/test/test.js @@ -1,5 +1,44 @@ var redisPubSub = require('../index'); +var redis = require('redis'); +var runTestSuite = require('sharedb/test/pubsub'); -require('sharedb/test/pubsub')(function(callback) { - callback(null, redisPubSub()); +describe('default options', function() { + runTestSuite(function(callback) { + callback(null, redisPubSub()); + }); +}); + +describe('unconnected client', function() { + runTestSuite(function(callback) { + callback(null, redisPubSub({ + client: redis.createClient() + })); + }); +}); + +describe('connected client', function() { + var client; + + beforeEach(function(done) { + client = redis.createClient(); + client.connect().then(function() { + done(); + }, done); + }); + + runTestSuite(function(callback) { + callback(null, redisPubSub({ + client: client + })); + }); +}); + +describe('connecting client', function() { + runTestSuite(function(callback) { + var client = redis.createClient(); + client.connect(); + callback(null, redisPubSub({ + client: client + })); + }); });