Skip to content

Commit 97c84f0

Browse files
committed
realtime
1 parent 7e8d1c4 commit 97c84f0

File tree

3 files changed

+431
-253
lines changed

3 files changed

+431
-253
lines changed

apps/api/plane/bgtasks/issue_activities_task.py

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,67 @@ def extract_ids(data: dict | None, primary_key: str, fallback_key: str) -> set[s
4242
return {str(x) for x in data.get(fallback_key, [])}
4343

4444

45+
def _deserialize_for_broadcast(raw_value):
46+
if raw_value is None:
47+
return None
48+
if isinstance(raw_value, (dict, list)):
49+
return raw_value
50+
if isinstance(raw_value, str):
51+
try:
52+
return json.loads(raw_value)
53+
except ValueError:
54+
return raw_value
55+
return str(raw_value)
56+
57+
58+
def broadcast_issue_event(
59+
*,
60+
project_id,
61+
issue_id,
62+
event_type,
63+
actor_id,
64+
epoch,
65+
requested_data,
66+
current_instance,
67+
):
68+
if not issue_id or not project_id:
69+
return
70+
71+
try:
72+
redis_client = redis_instance()
73+
except Exception as redis_error: # pragma: no cover - defensive guard
74+
log_exception(redis_error)
75+
return
76+
77+
if not redis_client:
78+
return
79+
80+
channel_id = str(project_id)
81+
payload = {
82+
"type": event_type,
83+
"issue_id": str(issue_id),
84+
"project_id": channel_id,
85+
"actor_id": str(actor_id) if actor_id else None,
86+
"timestamp": int(epoch) if isinstance(epoch, (int, float)) else None,
87+
}
88+
89+
requested_payload = _deserialize_for_broadcast(requested_data)
90+
if requested_payload is not None:
91+
payload["requested_data"] = requested_payload
92+
93+
current_payload = _deserialize_for_broadcast(current_instance)
94+
if current_payload is not None:
95+
payload["current_instance"] = current_payload
96+
97+
try:
98+
redis_client.publish(
99+
f"issue_events:{channel_id}",
100+
json.dumps(payload, default=str),
101+
)
102+
except Exception as publish_error: # pragma: no cover - defensive guard
103+
log_exception(publish_error)
104+
105+
45106
# Track Changes in name
46107
def track_name(
47108
requested_data,
@@ -1580,45 +1641,15 @@ def issue_activity(
15801641
issue_activities_created = IssueActivity.objects.bulk_create(issue_activities)
15811642

15821643
# Broadcast project issue updates for realtime listeners
1583-
if issue_id:
1584-
try:
1585-
ri = redis_instance()
1586-
project_channel = str(project_id)
1587-
payload = {
1588-
"type": type,
1589-
"issue_id": str(issue_id),
1590-
"project_id": project_channel,
1591-
"actor_id": str(actor_id) if actor_id else None,
1592-
"timestamp": int(epoch) if isinstance(epoch, (int, float)) else None,
1593-
}
1594-
1595-
def _deserialize(raw_value):
1596-
if raw_value is None:
1597-
return None
1598-
if isinstance(raw_value, (dict, list)):
1599-
return raw_value
1600-
if isinstance(raw_value, str):
1601-
try:
1602-
return json.loads(raw_value)
1603-
except ValueError:
1604-
return raw_value
1605-
return str(raw_value)
1606-
1607-
requested_payload = _deserialize(requested_data)
1608-
if requested_payload is not None:
1609-
payload["requested_data"] = requested_payload
1610-
1611-
current_payload = _deserialize(current_instance)
1612-
if current_payload is not None:
1613-
payload["current_instance"] = current_payload
1614-
1615-
ri.publish(
1616-
f"issue_events:{project_channel}",
1617-
json.dumps(payload, default=str),
1618-
)
1619-
except Exception as publish_error:
1620-
# Failure to broadcast should not block primary flow
1621-
log_exception(publish_error)
1644+
broadcast_issue_event(
1645+
project_id=project_id,
1646+
issue_id=issue_id,
1647+
event_type=type,
1648+
actor_id=actor_id,
1649+
epoch=epoch,
1650+
requested_data=requested_data,
1651+
current_instance=current_instance,
1652+
)
16221653

16231654
if notification:
16241655
notifications.delay(

apps/live/src/controllers/issue-events.controller.ts

Lines changed: 114 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type Redis from "ioredis";
12
import type { Request } from "express";
23
import type WebSocket from "ws";
34
// plane imports
@@ -8,104 +9,169 @@ import { redisManager } from "@/redis";
89
// auth
910
import { handleAuthentication } from "@/lib/auth";
1011

11-
@Controller("/issues")
12-
export class IssueEventsController {
13-
@WSDecorator("/")
14-
async handleConnection(ws: WebSocket, req: Request) {
15-
const query = req.query as Record<string, string | string[]>;
12+
type TokenPayload = {
13+
id?: string;
14+
cookie?: string;
15+
};
1616

17-
const projectIdRaw = query.projectId;
18-
const workspaceSlugRaw = query.workspaceSlug;
19-
const tokenRaw = query.token;
17+
type ConnectionParams = {
18+
projectId: string;
19+
workspaceSlug: string;
20+
token: string;
21+
};
2022

21-
const projectId = Array.isArray(projectIdRaw) ? projectIdRaw[0] : projectIdRaw;
22-
const workspaceSlug = Array.isArray(workspaceSlugRaw) ? workspaceSlugRaw[0] : workspaceSlugRaw;
23-
const token = Array.isArray(tokenRaw) ? tokenRaw[0] : tokenRaw;
23+
const getFirstQueryValue = (value?: string | string[]) => (Array.isArray(value) ? value[0] : value);
2424

25-
if (!projectId || !workspaceSlug || !token) {
26-
ws.close(4001, "Missing required parameters");
27-
return;
25+
const extractConnectionParams = (req: Request): ConnectionParams | null => {
26+
const query = req.query as Record<string, string | string[]>;
27+
const projectId = getFirstQueryValue(query.projectId);
28+
const workspaceSlug = getFirstQueryValue(query.workspaceSlug);
29+
const token = getFirstQueryValue(query.token);
30+
31+
if (!projectId || !workspaceSlug || !token) {
32+
return null;
33+
}
34+
35+
return { projectId, workspaceSlug, token };
36+
};
37+
38+
const parseToken = (rawToken: string): TokenPayload | null => {
39+
try {
40+
const parsed: unknown = JSON.parse(rawToken);
41+
if (!parsed || typeof parsed !== "object") {
42+
return null;
2843
}
44+
return parsed as TokenPayload;
45+
} catch (error) {
46+
logger.error("Invalid token payload for issue events", error);
47+
return null;
48+
}
49+
};
2950

30-
let parsedToken: { id?: string; cookie?: string } | undefined;
51+
const closeSocket = (ws: WebSocket, code: number, reason: string) => {
52+
if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
53+
return;
54+
}
55+
56+
try {
57+
ws.close(code, reason);
58+
} catch (error) {
59+
logger.error("Issue events websocket close failure", error);
60+
}
61+
};
62+
63+
const ensureAuthenticated = async (ws: WebSocket, token: TokenPayload, req: Request) => {
64+
const cookie = token?.cookie || req.headers.cookie || "";
65+
if (cookie) {
3166
try {
32-
parsedToken = JSON.parse(token);
67+
await handleAuthentication({
68+
cookie,
69+
userId: token?.id ?? "",
70+
});
71+
return true;
3372
} catch (error) {
34-
logger.error("Invalid token payload for issue events", error);
35-
ws.close(4002, "Invalid token");
73+
logger.error("Failed to authenticate issue events connection", error);
74+
closeSocket(ws, 4003, "Unauthorized");
75+
return false;
76+
}
77+
}
78+
79+
if (!token?.id) {
80+
closeSocket(ws, 4003, "Unauthorized");
81+
return false;
82+
}
83+
84+
return true;
85+
};
86+
87+
@Controller("/issues")
88+
export class IssueEventsController {
89+
@WSDecorator("/")
90+
async handleConnection(ws: WebSocket, req: Request) {
91+
const params = extractConnectionParams(req);
92+
if (!params) {
93+
closeSocket(ws, 4001, "Missing required parameters");
3694
return;
3795
}
3896

39-
const cookieString = parsedToken?.cookie || req.headers.cookie || "";
40-
if (cookieString) {
41-
try {
42-
await handleAuthentication({
43-
cookie: cookieString,
44-
userId: parsedToken?.id ?? "",
45-
});
46-
} catch (error) {
47-
logger.error("Failed to authenticate issue events connection", error);
48-
ws.close(4003, "Unauthorized");
49-
return;
50-
}
51-
} else if (!parsedToken?.id) {
52-
ws.close(4003, "Unauthorized");
97+
const tokenPayload = parseToken(params.token);
98+
if (!tokenPayload) {
99+
closeSocket(ws, 4002, "Invalid token");
100+
return;
101+
}
102+
103+
const authenticated = await ensureAuthenticated(ws, tokenPayload, req);
104+
if (!authenticated) {
53105
return;
54106
}
55107

56108
const redisClient = redisManager.getClient();
57109
if (!redisClient) {
58-
ws.close(1011, "Realtime service unavailable");
110+
closeSocket(ws, 1011, "Realtime service unavailable");
59111
return;
60112
}
61113

62-
const channel = `issue_events:${projectId}`;
63-
const subscriber = redisClient.duplicate();
64-
let cleanupInitiated = false;
114+
let subscriber: Redis;
115+
try {
116+
subscriber = redisClient.duplicate();
117+
} catch (error) {
118+
logger.error("Failed to create issue events redis subscriber", error);
119+
closeSocket(ws, 1011, "Realtime service unavailable");
120+
return;
121+
}
122+
123+
const channel = `issue_events:${params.projectId}`;
124+
let cleanupStarted = false;
65125

66126
const cleanup = async () => {
67-
if (cleanupInitiated) return;
68-
cleanupInitiated = true;
127+
if (cleanupStarted) return;
128+
cleanupStarted = true;
129+
69130
subscriber.removeAllListeners("message");
70131
subscriber.removeAllListeners("error");
132+
71133
try {
72134
await subscriber.unsubscribe(channel);
73135
} catch (error) {
74136
logger.error("Failed to unsubscribe issue events channel", error);
75137
}
76-
subscriber.disconnect();
138+
139+
try {
140+
subscriber.disconnect();
141+
} catch (error) {
142+
logger.error("Failed to disconnect issue events subscriber", error);
143+
}
77144
};
78145

79146
try {
80147
subscriber.on("error", (error) => {
81148
logger.error("Issue events redis subscriber error", error);
82-
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
83-
ws.close(1011, "Realtime service unavailable");
84-
}
149+
closeSocket(ws, 1011, "Realtime service unavailable");
85150
void cleanup();
86151
});
152+
87153
await subscriber.connect();
154+
88155
subscriber.on("message", (incomingChannel, message) => {
89156
if (incomingChannel === channel && ws.readyState === ws.OPEN) {
90157
ws.send(message);
91158
}
92159
});
160+
93161
await subscriber.subscribe(channel);
162+
94163
ws.on("close", () => {
95164
void cleanup();
96165
});
166+
97167
ws.on("error", (error) => {
98168
logger.error("Issue events websocket error", error);
99-
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
100-
ws.close(1011, "Issue events websocket error");
101-
}
169+
closeSocket(ws, 1011, "Issue events websocket error");
102170
void cleanup();
103171
});
104172
} catch (error) {
105173
logger.error("Failed to subscribe to issue events channel", error);
106-
if (ws.readyState === ws.OPEN || ws.readyState === ws.CONNECTING) {
107-
ws.close(1011, "Subscription failure");
108-
}
174+
closeSocket(ws, 1011, "Subscription failure");
109175
void cleanup();
110176
}
111177
}

0 commit comments

Comments
 (0)