Skip to content

Commit

Permalink
fix: Improve cluster connection pool logic when disconnecting (#5)
Browse files Browse the repository at this point in the history
* Tell Redis cluster to disable protected mode before running tests

Signed-off-by: Martin Slota <[email protected]>

* Try to enable Redis cluster tests on CI

Signed-off-by: Martin Slota <[email protected]>

* Add a failing test around Redis cluster disconnection logic

Signed-off-by: Martin Slota <[email protected]>

* Rename function parameter

Signed-off-by: Martin Slota <[email protected]>

* Turn node error listener into an arrow function so that  points to the connection pool instance

Signed-off-by: Martin Slota <[email protected]>

* Extract node listeners into separate constants

Signed-off-by: Martin Slota <[email protected]>

* Keep track of listeners along with each Redis client

Signed-off-by: Martin Slota <[email protected]>

* Remove node listeners when the node is being removed

Signed-off-by: Martin Slota <[email protected]>

* Emit node removal events whenever a node is removed

Signed-off-by: Martin Slota <[email protected]>

* When resetting, add nodes before removing old ones

Signed-off-by: Martin Slota <[email protected]>

* Rename Node type to NodeRecord for clarity

Signed-off-by: Martin Slota <[email protected]>

* Also rename the field holding node records

Signed-off-by: Martin Slota <[email protected]>

* Rename variable to nodeRecord

Signed-off-by: Martin Slota <[email protected]>

* Rename another variable to nodeRecord

Signed-off-by: Martin Slota <[email protected]>

* Fix a reference to connection pool nodes

Signed-off-by: Martin Slota <[email protected]>

* Do not fail when retrieving a node by non-existing key

Signed-off-by: Martin Slota <[email protected]>

* Revert "Fix a reference to connection pool nodes"

This reverts commit 6536e22.

Signed-off-by: Martin Slota <[email protected]>

* Fix a reference to connection pool nodes, this time a bit more correctly

Signed-off-by: Martin Slota <[email protected]>

* Add a valid slots table to mock server in tests that expect to be able to connect using the Cluster client

Signed-off-by: Martin Slota <[email protected]>

* Do not assume that node removal will occur *after* refreshSlotsCache() is finished

Signed-off-by: Martin Slota <[email protected]>

---------

Signed-off-by: Martin Slota <[email protected]>
  • Loading branch information
martinslota authored Jun 13, 2024
1 parent 11b3cbb commit 2733aee
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 71 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ jobs:
flag-name: node-${{matrix.node}}
parallel: true

# test-cluster:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - name: Build and test cluster
# run: bash test/cluster/docker/main.sh
test-cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build and test cluster
run: bash test/cluster/docker/main.sh

code-coverage:
needs: test
Expand Down
2 changes: 1 addition & 1 deletion lib/Pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Pipeline.prototype.exec = function (callback: Callback): Promise<Array<any>> {
if (_this.isCluster) {
node = {
slot: pipelineSlot,
redis: _this.redis.connectionPool.nodes.all[_this.preferKey],
redis: _this.redis.connectionPool.getInstanceByKey(_this.preferKey),
};
}

Expand Down
106 changes: 60 additions & 46 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ const debug = Debug("cluster:connectionPool");

type NODE_TYPE = "all" | "master" | "slave";

type NodeRecord = {
redis: Redis;
endListener: () => void;
errorListener: (error: unknown) => void;
};

export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: { [key in NODE_TYPE]: { [key: string]: Redis } } = {
private nodeRecords: { [key in NODE_TYPE]: { [key: string]: NodeRecord } } = {
all: {},
master: {},
slave: {},
Expand All @@ -22,51 +28,51 @@ export default class ConnectionPool extends EventEmitter {
}

getNodes(role: NodeRole = "all"): Redis[] {
const nodes = this.nodes[role];
return Object.keys(nodes).map((key) => nodes[key]);
const nodeRecords = this.nodeRecords[role];
return Object.keys(nodeRecords).map((key) => nodeRecords[key].redis);
}

getInstanceByKey(key: NodeKey): Redis {
return this.nodes.all[key];
return this.nodeRecords.all[key]?.redis;
}

getSampleInstance(role: NodeRole): Redis {
const keys = Object.keys(this.nodes[role]);
const keys = Object.keys(this.nodeRecords[role]);
const sampleKey = sample(keys);
return this.nodes[role][sampleKey];
return this.nodeRecords[role][sampleKey].redis;
}

/**
* Find or create a connection to the node
*/
findOrCreate(node: RedisOptions, readOnly = false): Redis {
const key = getNodeKey(node);
findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord {
const key = getNodeKey(redisOptions);
readOnly = Boolean(readOnly);

if (this.specifiedOptions[key]) {
Object.assign(node, this.specifiedOptions[key]);
Object.assign(redisOptions, this.specifiedOptions[key]);
} else {
this.specifiedOptions[key] = node;
this.specifiedOptions[key] = redisOptions;
}

let redis: Redis;
if (this.nodes.all[key]) {
redis = this.nodes.all[key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
let nodeRecord: NodeRecord;
if (this.nodeRecords.all[key]) {
nodeRecord = this.nodeRecords.all[key];
if (nodeRecord.redis.options.readOnly !== readOnly) {
nodeRecord.redis.options.readOnly = readOnly;
debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
nodeRecord.redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
if (readOnly) {
delete this.nodes.master[key];
this.nodes.slave[key] = redis;
delete this.nodeRecords.master[key];
this.nodeRecords.slave[key] = nodeRecord;
} else {
delete this.nodes.slave[key];
this.nodes.master[key] = redis;
delete this.nodeRecords.slave[key];
this.nodeRecords.master[key] = nodeRecord;
}
}
} else {
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
redis = new Redis(
const redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
Expand All @@ -79,30 +85,30 @@ export default class ConnectionPool extends EventEmitter {
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
redisOptions,
this.redisOptions,
{ lazyConnect: true }
)
);
this.nodes.all[key] = redis;
this.nodes[readOnly ? "slave" : "master"][key] = redis;

redis.once("end", () => {
const endListener = () => {
this.removeNode(key);
this.emit("-node", redis, key);
if (!Object.keys(this.nodes.all).length) {
this.emit("drain");
}
});
};
const errorListener = (error: unknown) => {
this.emit("nodeError", error, key);
};
nodeRecord = { redis, endListener, errorListener };

this.nodeRecords.all[key] = nodeRecord;
this.nodeRecords[readOnly ? "slave" : "master"][key] = nodeRecord;

redis.once("end", endListener);

this.emit("+node", redis, key);

redis.on("error", function (error) {
this.emit("nodeError", error, key);
});
redis.on("error", errorListener);
}

return redis;
return nodeRecord;
}

/**
Expand All @@ -122,29 +128,37 @@ export default class ConnectionPool extends EventEmitter {
}
});

Object.keys(this.nodes.all).forEach((key) => {
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
Object.keys(this.nodeRecords.all).forEach((key) => {
if (!newNodes[key]) {
debug("Disconnect %s because the node does not hold any slot", key);
this.nodes.all[key].disconnect();
this.nodeRecords.all[key].redis.disconnect();
this.removeNode(key);
}
});
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
}

/**
* Remove a node from the pool.
*/
private removeNode(key: string): void {
const { nodes } = this;
if (nodes.all[key]) {
const { nodeRecords } = this;
const nodeRecord = nodeRecords.all[key];
if (nodeRecord) {
debug("Remove %s from the pool", key);
delete nodes.all[key];
nodeRecord.redis.removeListener("end", nodeRecord.endListener);
nodeRecord.redis.removeListener("error", nodeRecord.errorListener);
delete nodeRecords.all[key];
delete nodeRecords.master[key];
delete nodeRecords.slave[key];

this.emit("-node", nodeRecord.redis, key);
if (!Object.keys(nodeRecords.all).length) {
this.emit("drain");
}
}
delete nodes.master[key];
delete nodes.slave[key];
}
}
21 changes: 21 additions & 0 deletions test/cluster/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,25 @@ describe("cluster", () => {
expect(await cluster2.get("prefix:foo")).to.eql("bar");
});
});

describe("disconnect and connect again", () => {
it("works 20 times in a row", async () => {
const cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]);

for (let i = 1; i <= 20; i++) {
await cluster.set("foo", `bar${i}`);

const endPromise = new Promise((resolve) =>
cluster.once("end", resolve)
);
await cluster.quit();
cluster.disconnect();
await endPromise;

cluster.connect();
expect(await cluster.get("foo")).to.equal(`bar${i}`);
await cluster.del("foo");
}
});
});
});
14 changes: 13 additions & 1 deletion test/cluster/docker/main.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
docker run -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
#!/bin/bash

set -euo pipefail

docker run --rm --name redis-cluster-ioredis-test -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
trap 'docker stop redis-cluster-ioredis-test' EXIT

npm install

sleep 15

for port in {30000..30005}; do
docker exec redis-cluster-ioredis-test /bin/bash -c "redis-cli -p $port CONFIG SET protected-mode no"
done

npm run test:js:cluster || npm run test:js:cluster || npm run test:js:cluster
7 changes: 6 additions & 1 deletion test/functional/cluster/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,12 @@ describe("cluster:connect", () => {

describe("multiple reconnect", () => {
it("should reconnect after multiple consecutive disconnect(true) are called", (done) => {
new MockServer(30001);
const slotTable = [[0, 16383, ["127.0.0.1", 30001]]];
new MockServer(30001, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});
const cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], {
enableReadyCheck: false,
});
Expand Down
7 changes: 6 additions & 1 deletion test/functional/cluster/disconnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ describe("disconnection", () => {
});

it("should clear all timers on disconnect", (done) => {
const server = new MockServer(30000);
const slotTable = [[0, 16383, ["127.0.0.1", 30000]]];
const server = new MockServer(30000, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");
Expand Down
31 changes: 16 additions & 15 deletions test/functional/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,27 +370,28 @@ describe("cluster", () => {
});
cluster.on("ready", () => {
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(3);
slotTable = [
[0, 5460, ["127.0.0.1", 30003]],
[5461, 10922, ["127.0.0.1", 30002]],
];
cluster.refreshSlotsCache(() => {
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
});
cluster.refreshSlotsCache();
});
});
});
Expand Down

0 comments on commit 2733aee

Please sign in to comment.