Skip to content

Commit 7e8d1c4

Browse files
committed
real-time
1 parent 298acb1 commit 7e8d1c4

File tree

5 files changed

+400
-1
lines changed

5 files changed

+400
-1
lines changed

apps/api/plane/bgtasks/issue_activities_task.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,6 +1579,47 @@ def issue_activity(
15791579
# Save all the values to database
15801580
issue_activities_created = IssueActivity.objects.bulk_create(issue_activities)
15811581

1582+
# Broadcast project issue updates for realtime listeners
1583+
if issue_id:
1584+
try:
1585+
ri = redis_instance()
1586+
project_channel = str(project_id)
1587+
payload = {
1588+
"type": type,
1589+
"issue_id": str(issue_id),
1590+
"project_id": project_channel,
1591+
"actor_id": str(actor_id) if actor_id else None,
1592+
"timestamp": int(epoch) if isinstance(epoch, (int, float)) else None,
1593+
}
1594+
1595+
def _deserialize(raw_value):
1596+
if raw_value is None:
1597+
return None
1598+
if isinstance(raw_value, (dict, list)):
1599+
return raw_value
1600+
if isinstance(raw_value, str):
1601+
try:
1602+
return json.loads(raw_value)
1603+
except ValueError:
1604+
return raw_value
1605+
return str(raw_value)
1606+
1607+
requested_payload = _deserialize(requested_data)
1608+
if requested_payload is not None:
1609+
payload["requested_data"] = requested_payload
1610+
1611+
current_payload = _deserialize(current_instance)
1612+
if current_payload is not None:
1613+
payload["current_instance"] = current_payload
1614+
1615+
ri.publish(
1616+
f"issue_events:{project_channel}",
1617+
json.dumps(payload, default=str),
1618+
)
1619+
except Exception as publish_error:
1620+
# Failure to broadcast should not block primary flow
1621+
log_exception(publish_error)
1622+
15821623
if notification:
15831624
notifications.delay(
15841625
type=type,

apps/live/src/controllers/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { CollaborationController } from "./collaboration.controller";
22
import { ConvertDocumentController } from "./convert-document.controller";
33
import { HealthController } from "./health.controller";
4+
import { IssueEventsController } from "./issue-events.controller";
45

5-
export const CONTROLLERS = [CollaborationController, ConvertDocumentController, HealthController];
6+
export const CONTROLLERS = [CollaborationController, ConvertDocumentController, HealthController, IssueEventsController];
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import type { Request } from "express";
2+
import type WebSocket from "ws";
3+
// plane imports
4+
import { Controller, WebSocket as WSDecorator } from "@plane/decorators";
5+
import { logger } from "@plane/logger";
6+
// redis
7+
import { redisManager } from "@/redis";
8+
// auth
9+
import { handleAuthentication } from "@/lib/auth";
10+
11+
@Controller("/issues")
12+
export class IssueEventsController {
13+
@WSDecorator("/")
14+
async handleConnection(ws: WebSocket, req: Request) {
15+
const query = req.query as Record<string, string | string[]>;
16+
17+
const projectIdRaw = query.projectId;
18+
const workspaceSlugRaw = query.workspaceSlug;
19+
const tokenRaw = query.token;
20+
21+
const projectId = Array.isArray(projectIdRaw) ? projectIdRaw[0] : projectIdRaw;
22+
const workspaceSlug = Array.isArray(workspaceSlugRaw) ? workspaceSlugRaw[0] : workspaceSlugRaw;
23+
const token = Array.isArray(tokenRaw) ? tokenRaw[0] : tokenRaw;
24+
25+
if (!projectId || !workspaceSlug || !token) {
26+
ws.close(4001, "Missing required parameters");
27+
return;
28+
}
29+
30+
let parsedToken: { id?: string; cookie?: string } | undefined;
31+
try {
32+
parsedToken = JSON.parse(token);
33+
} catch (error) {
34+
logger.error("Invalid token payload for issue events", error);
35+
ws.close(4002, "Invalid token");
36+
return;
37+
}
38+
39+
const cookieString = parsedToken?.cookie || req.headers.cookie || "";
40+
if (cookieString) {
41+
try {
42+
await handleAuthentication({
43+
cookie: cookieString,
44+
userId: parsedToken?.id ?? "",
45+
});
46+
} catch (error) {
47+
logger.error("Failed to authenticate issue events connection", error);
48+
ws.close(4003, "Unauthorized");
49+
return;
50+
}
51+
} else if (!parsedToken?.id) {
52+
ws.close(4003, "Unauthorized");
53+
return;
54+
}
55+
56+
const redisClient = redisManager.getClient();
57+
if (!redisClient) {
58+
ws.close(1011, "Realtime service unavailable");
59+
return;
60+
}
61+
62+
const channel = `issue_events:${projectId}`;
63+
const subscriber = redisClient.duplicate();
64+
let cleanupInitiated = false;
65+
66+
const cleanup = async () => {
67+
if (cleanupInitiated) return;
68+
cleanupInitiated = true;
69+
subscriber.removeAllListeners("message");
70+
subscriber.removeAllListeners("error");
71+
try {
72+
await subscriber.unsubscribe(channel);
73+
} catch (error) {
74+
logger.error("Failed to unsubscribe issue events channel", error);
75+
}
76+
subscriber.disconnect();
77+
};
78+
79+
try {
80+
subscriber.on("error", (error) => {
81+
logger.error("Issue events redis subscriber error", error);
82+
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
83+
ws.close(1011, "Realtime service unavailable");
84+
}
85+
void cleanup();
86+
});
87+
await subscriber.connect();
88+
subscriber.on("message", (incomingChannel, message) => {
89+
if (incomingChannel === channel && ws.readyState === ws.OPEN) {
90+
ws.send(message);
91+
}
92+
});
93+
await subscriber.subscribe(channel);
94+
ws.on("close", () => {
95+
void cleanup();
96+
});
97+
ws.on("error", (error) => {
98+
logger.error("Issue events websocket error", error);
99+
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
100+
ws.close(1011, "Issue events websocket error");
101+
}
102+
void cleanup();
103+
});
104+
} catch (error) {
105+
logger.error("Failed to subscribe to issue events channel", error);
106+
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
107+
ws.close(1011, "Subscription failure");
108+
}
109+
void cleanup();
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)