-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.js
99 lines (80 loc) · 2.45 KB
/
engine.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
const express = require("express");
const http = require("http");
const cors = require("cors");
const redis = require("redis");
const { Server } = require("socket.io");
const app = express();
const server = http.createServer(app);
const io = new Server(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
},
});
let connectedClients = 0;
const REDIS_HOST = process.env.REDIS_HOST || "127.0.0.1";
const REDIS_PORT = process.env.REDIS_PORT || 6379;
const client = redis.createClient({
url: `redis://${REDIS_HOST}:${REDIS_PORT}`,
});
const subscriptions = new Map();
const connectRedisClients = async () => {
try {
await client
.on("error", (err) => console.log("Redis Client Error:", err))
.connect();
const subscriber = client.duplicate();
await subscriber.connect();
io.on("connection", (socket) => {
console.log("A client connected");
connectedClients++;
io.emit("updateClientCount", connectedClients);
socket.on("subscribe", async (channel) => {
console.log(`Client subscribed to channel: ${channel}`);
if (!subscriptions.has(socket.id)) {
subscriptions.set(socket.id, []);
}
const clientSubscriptions = subscriptions.get(socket.id);
if (!clientSubscriptions.includes(channel)) {
clientSubscriptions.push(channel);
await subscriber.pSubscribe(channel, (message, channel) => {
console.log(
`Received message from Redis channel ${channel}:`,
message
);
try {
const parsedMessage = JSON.parse(message);
socket.emit("redis_message", { channel, data: parsedMessage });
} catch (e) {
console.error("Error parsing message:", e);
}
});
}
});
socket.on("unsubscribe", async (channel) => {
console.log(`Client unsubscribed from channel: ${channel}`);
if (subscriptions.has(socket.id)) {
const clientSubscriptions = subscriptions.get(socket.id);
const index = clientSubscriptions.indexOf(channel);
if (index !== -1) {
clientSubscriptions.splice(index, 1);
await subscriber.pUnsubscribe(channel);
}
}
});
socket.on("disconnect", () => {
connectedClients--;
console.log("A client disconnected");
io.emit("updateClientCount", connectedClients);
subscriptions.delete(socket.id);
});
});
} catch (err) {
console.error("Redis Connection Error:", err);
}
};
connectRedisClients();
app.use(cors());
server.listen(4000, () => {
console.log("Server is running on port 4000");
});