Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve cluster connection pool logic when disconnecting #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
90039df
Tell Redis cluster to disable protected mode before running tests
martinslota Jun 10, 2024
8b54785
Try to enable Redis cluster tests on CI
martinslota Jun 10, 2024
da9b9f7
Add a failing test around Redis cluster disconnection logic
martinslota Jun 10, 2024
f0e5f66
Rename function parameter
martinslota Jun 10, 2024
78e2239
Turn node error listener into an arrow function so that points to th…
martinslota Jun 10, 2024
fdb3db8
Extract node listeners into separate constants
martinslota Jun 10, 2024
d4cec9c
Keep track of listeners along with each Redis client
martinslota Jun 10, 2024
c767ee0
Remove node listeners when the node is being removed
martinslota Jun 10, 2024
43b8522
Emit node removal events whenever a node is removed
martinslota Jun 10, 2024
09d336d
When resetting, add nodes before removing old ones
martinslota Jun 10, 2024
f002230
Rename Node type to NodeRecord for clarity
martinslota Jun 10, 2024
41751c0
Also rename the field holding node records
martinslota Jun 10, 2024
c8fe73a
Rename variable to nodeRecord
martinslota Jun 10, 2024
21b947d
Rename another variable to nodeRecord
martinslota Jun 10, 2024
6536e22
Fix a reference to connection pool nodes
martinslota Jun 10, 2024
158c64c
Do not fail when retrieving a node by non-existing key
martinslota Jun 10, 2024
788361c
Revert "Fix a reference to connection pool nodes"
martinslota Jun 10, 2024
0deaeba
Fix a reference to connection pool nodes, this time a bit more correctly
martinslota Jun 10, 2024
d5d85e9
Add a valid slots table to mock server in tests that expect to be abl…
martinslota Jun 10, 2024
d3f83c5
Do not assume that node removal will occur *after* refreshSlotsCache(…
martinslota Jun 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ jobs:
flag-name: node-${{matrix.node}}
parallel: true

# test-cluster:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v2
# - name: Build and test cluster
# run: bash test/cluster/docker/main.sh
test-cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Build and test cluster
run: bash test/cluster/docker/main.sh

code-coverage:
needs: test
Expand Down
2 changes: 1 addition & 1 deletion lib/Pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Pipeline.prototype.exec = function (callback: Callback): Promise<Array<any>> {
if (_this.isCluster) {
node = {
slot: pipelineSlot,
redis: _this.redis.connectionPool.nodes.all[_this.preferKey],
redis: _this.redis.connectionPool.getInstanceByKey(_this.preferKey),
};
}

Expand Down
106 changes: 60 additions & 46 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ const debug = Debug("cluster:connectionPool");

type NODE_TYPE = "all" | "master" | "slave";

type NodeRecord = {
redis: Redis;
endListener: () => void;
errorListener: (error: unknown) => void;
};

export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: { [key in NODE_TYPE]: { [key: string]: Redis } } = {
private nodeRecords: { [key in NODE_TYPE]: { [key: string]: NodeRecord } } = {
all: {},
master: {},
slave: {},
Expand All @@ -22,51 +28,51 @@ export default class ConnectionPool extends EventEmitter {
}

getNodes(role: NodeRole = "all"): Redis[] {
const nodes = this.nodes[role];
return Object.keys(nodes).map((key) => nodes[key]);
const nodeRecords = this.nodeRecords[role];
return Object.keys(nodeRecords).map((key) => nodeRecords[key].redis);
}

getInstanceByKey(key: NodeKey): Redis {
return this.nodes.all[key];
return this.nodeRecords.all[key]?.redis;
}

getSampleInstance(role: NodeRole): Redis {
const keys = Object.keys(this.nodes[role]);
const keys = Object.keys(this.nodeRecords[role]);
const sampleKey = sample(keys);
return this.nodes[role][sampleKey];
return this.nodeRecords[role][sampleKey].redis;
}

/**
* Find or create a connection to the node
*/
findOrCreate(node: RedisOptions, readOnly = false): Redis {
const key = getNodeKey(node);
findOrCreate(redisOptions: RedisOptions, readOnly = false): NodeRecord {
const key = getNodeKey(redisOptions);
readOnly = Boolean(readOnly);

if (this.specifiedOptions[key]) {
Object.assign(node, this.specifiedOptions[key]);
Object.assign(redisOptions, this.specifiedOptions[key]);
} else {
this.specifiedOptions[key] = node;
this.specifiedOptions[key] = redisOptions;
}

let redis: Redis;
if (this.nodes.all[key]) {
redis = this.nodes.all[key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
let nodeRecord: NodeRecord;
if (this.nodeRecords.all[key]) {
nodeRecord = this.nodeRecords.all[key];
if (nodeRecord.redis.options.readOnly !== readOnly) {
nodeRecord.redis.options.readOnly = readOnly;
debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
nodeRecord.redis[readOnly ? "readonly" : "readwrite"]().catch(noop);
if (readOnly) {
delete this.nodes.master[key];
this.nodes.slave[key] = redis;
delete this.nodeRecords.master[key];
this.nodeRecords.slave[key] = nodeRecord;
} else {
delete this.nodes.slave[key];
this.nodes.master[key] = redis;
delete this.nodeRecords.slave[key];
this.nodeRecords.master[key] = nodeRecord;
}
}
} else {
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
redis = new Redis(
const redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
Expand All @@ -79,30 +85,30 @@ export default class ConnectionPool extends EventEmitter {
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
redisOptions,
this.redisOptions,
{ lazyConnect: true }
)
);
this.nodes.all[key] = redis;
this.nodes[readOnly ? "slave" : "master"][key] = redis;

redis.once("end", () => {
const endListener = () => {
this.removeNode(key);
this.emit("-node", redis, key);
if (!Object.keys(this.nodes.all).length) {
this.emit("drain");
}
});
};
const errorListener = (error: unknown) => {
this.emit("nodeError", error, key);
};
nodeRecord = { redis, endListener, errorListener };

this.nodeRecords.all[key] = nodeRecord;
this.nodeRecords[readOnly ? "slave" : "master"][key] = nodeRecord;

redis.once("end", endListener);

this.emit("+node", redis, key);

redis.on("error", function (error) {
this.emit("nodeError", error, key);
});
redis.on("error", errorListener);
}

return redis;
return nodeRecord;
}

/**
Expand All @@ -122,29 +128,37 @@ export default class ConnectionPool extends EventEmitter {
}
});

Object.keys(this.nodes.all).forEach((key) => {
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
Object.keys(this.nodeRecords.all).forEach((key) => {
if (!newNodes[key]) {
debug("Disconnect %s because the node does not hold any slot", key);
this.nodes.all[key].disconnect();
this.nodeRecords.all[key].redis.disconnect();
this.removeNode(key);
}
});
Object.keys(newNodes).forEach((key) => {
const node = newNodes[key];
this.findOrCreate(node, node.readOnly);
});
}

/**
* Remove a node from the pool.
*/
private removeNode(key: string): void {
const { nodes } = this;
if (nodes.all[key]) {
const { nodeRecords } = this;
const nodeRecord = nodeRecords.all[key];
if (nodeRecord) {
debug("Remove %s from the pool", key);
delete nodes.all[key];
nodeRecord.redis.removeListener("end", nodeRecord.endListener);
nodeRecord.redis.removeListener("error", nodeRecord.errorListener);
delete nodeRecords.all[key];
delete nodeRecords.master[key];
delete nodeRecords.slave[key];

this.emit("-node", nodeRecord.redis, key);
if (!Object.keys(nodeRecords.all).length) {
this.emit("drain");
}
}
delete nodes.master[key];
delete nodes.slave[key];
}
}
21 changes: 21 additions & 0 deletions test/cluster/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,25 @@ describe("cluster", () => {
expect(await cluster2.get("prefix:foo")).to.eql("bar");
});
});

describe("disconnect and connect again", () => {
it("works 20 times in a row", async () => {
const cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]);

for (let i = 1; i <= 20; i++) {
await cluster.set("foo", `bar${i}`);

const endPromise = new Promise((resolve) =>
cluster.once("end", resolve)
);
await cluster.quit();
cluster.disconnect();
await endPromise;

cluster.connect();
expect(await cluster.get("foo")).to.equal(`bar${i}`);
await cluster.del("foo");
}
});
});
});
14 changes: 13 additions & 1 deletion test/cluster/docker/main.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
docker run -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
#!/bin/bash

set -euo pipefail

docker run --rm --name redis-cluster-ioredis-test -e "INITIAL_PORT=30000" -e "IP=0.0.0.0" -p 30000-30005:30000-30005 grokzen/redis-cluster:latest &
trap 'docker stop redis-cluster-ioredis-test' EXIT

npm install

sleep 15

for port in {30000..30005}; do
docker exec redis-cluster-ioredis-test /bin/bash -c "redis-cli -p $port CONFIG SET protected-mode no"
done

npm run test:js:cluster || npm run test:js:cluster || npm run test:js:cluster
7 changes: 6 additions & 1 deletion test/functional/cluster/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,12 @@ describe("cluster:connect", () => {

describe("multiple reconnect", () => {
it("should reconnect after multiple consecutive disconnect(true) are called", (done) => {
new MockServer(30001);
const slotTable = [[0, 16383, ["127.0.0.1", 30001]]];
new MockServer(30001, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});
const cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], {
enableReadyCheck: false,
});
Expand Down
7 changes: 6 additions & 1 deletion test/functional/cluster/disconnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ describe("disconnection", () => {
});

it("should clear all timers on disconnect", (done) => {
const server = new MockServer(30000);
const slotTable = [[0, 16383, ["127.0.0.1", 30000]]];
const server = new MockServer(30000, (argv) => {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return slotTable;
}
});

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");
Expand Down
31 changes: 16 additions & 15 deletions test/functional/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,27 +370,28 @@ describe("cluster", () => {
});
cluster.on("ready", () => {
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(3);
slotTable = [
[0, 5460, ["127.0.0.1", 30003]],
[5461, 10922, ["127.0.0.1", 30002]],
];
cluster.refreshSlotsCache(() => {
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
cluster.once("-node", function (removed) {
expect(removed.options.port).to.eql(30001);
expect(cluster.nodes("master")).to.have.lengthOf(2);
expect(cluster.nodes("all")).to.have.lengthOf(2);
expect(
[
cluster.nodes("master")[0].options.port,
cluster.nodes("master")[1].options.port,
].sort()
).to.eql([30002, 30003]);
cluster.nodes("master").forEach(function (node) {
expect(node.options).to.have.property("readOnly", false);
});
cluster.disconnect();
done();
});
cluster.refreshSlotsCache();
});
});
});
Expand Down
Loading