Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ui): socket yjs manager & refactor #649

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ui/src/lib/i18n/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
"Member has been successfully added to the workspace.": "",
"Member Removed": "",
"Member has been successfully removed from the workspace.": "",
"Connection Error": "",
"Empty workflow detected": "",
"You cannot create a deployment without a workflow.": "",
"Reload": ""
pyshx marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 1 addition & 0 deletions ui/src/lib/i18n/locales/es.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
"Member has been successfully added to the workspace.": "El miembro ha sido agregado exitosamente al espacio de trabajo.",
"Member Removed": "Miembro eliminado",
"Member has been successfully removed from the workspace.": "El miembro ha sido eliminado exitosamente del espacio de trabajo.",
"Connection Error": "",
pyshx marked this conversation as resolved.
Show resolved Hide resolved
"Empty workflow detected": "Se detectó un flujo de trabajo vacío",
"You cannot create a deployment without a workflow.": "No puedes crear un despliegue sin un flujo de trabajo.",
"Reload": "Recargar"
Expand Down
1 change: 1 addition & 0 deletions ui/src/lib/i18n/locales/fr.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
"Member has been successfully added to the workspace.": "Le membre a été ajouté avec succès à l'espace de travail.",
"Member Removed": "Membre supprimé",
"Member has been successfully removed from the workspace.": "Le membre a été supprimé avec succès de l'espace de travail.",
"Connection Error": "",
"Empty workflow detected": "Flux de travail vide détecté",
"You cannot create a deployment without a workflow.": "Vous ne pouvez pas créer de déploiement sans flux de travail.",
"Reload": "Recharger"
Expand Down
1 change: 1 addition & 0 deletions ui/src/lib/i18n/locales/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
"Member has been successfully added to the workspace.": "メンバーがワークスペースに正常に追加されました。",
"Member Removed": "メンバー削除",
"Member has been successfully removed from the workspace.": "メンバーがワークスペースから正常に削除されました。",
"Connection Error": "",
pyshx marked this conversation as resolved.
Show resolved Hide resolved
"Empty workflow detected": "空のワークフローが検出されました",
"You cannot create a deployment without a workflow.": "ワークフローがないとデプロイメントを作成できません。",
"Reload": "リロード"
Expand Down
1 change: 1 addition & 0 deletions ui/src/lib/i18n/locales/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
"Member has been successfully added to the workspace.": "成员已成功添加到工作区。",
"Member Removed": "成员已移除",
"Member has been successfully removed from the workspace.": "成员已成功从工作区中移除。",
"Connection Error": "",
"Empty workflow detected": "检测到空工作流",
"You cannot create a deployment without a workflow.": "没有工作流,您无法创建部署。",
"Reload": "重新加载"
Expand Down
4 changes: 4 additions & 0 deletions ui/src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ import { twMerge } from "tailwind-merge";
export function cn(...inputs: ClassValue[]) {
return twMerge(clsx(inputs));
}

export function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
303 changes: 303 additions & 0 deletions ui/src/lib/yjs/socketYjsManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
import * as Y from "yjs";

import { sleep } from "../utils";

import type { FlowMessage } from "./types";

export type AccessTokenProvider = () => Promise<string> | string;

export class SocketYjsManager {
protected ws!: WebSocket;
protected doc: Y.Doc;
protected socketReady = false;
protected firstSyncComplete = false;
protected accessTokenProvider: AccessTokenProvider | undefined;
protected projectId: string | undefined;
Comment on lines +14 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make required fields non-optional or add proper null checks.

The accessTokenProvider and projectId fields are marked as optional but are used throughout the code without proper null checks. This could lead to runtime errors.

Apply this diff:

-  protected accessTokenProvider: AccessTokenProvider | undefined;
-  protected projectId: string | undefined;
+  private accessTokenProvider!: AccessTokenProvider;
+  private projectId!: string;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected accessTokenProvider: AccessTokenProvider | undefined;
protected projectId: string | undefined;
private accessTokenProvider!: AccessTokenProvider;
private projectId!: string;

protected onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
protected reconnectAttempts = 0;
protected maxReconnectAttempts = 5;
protected reconnectDelay = 1000;
protected reconnectTimer: ReturnType<typeof setTimeout> | null = null;

Comment on lines +9 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider revising field visibility and optional types

The class has several issues in its field declarations:

  1. Protected visibility seems unnecessary for internal implementation
  2. Optional fields (accessTokenProvider and projectId) are used throughout the code without proper null checks

Consider this improvement:

-export class SocketYjsManager {
-  protected ws!: WebSocket;
-  protected doc: Y.Doc;
-  protected socketReady = false;
-  protected firstSyncComplete = false;
-  protected accessTokenProvider: AccessTokenProvider | undefined;
-  protected projectId: string | undefined;
-  protected onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
+export class SocketYjsManager {
+  private ws!: WebSocket;
+  private doc: Y.Doc;
+  private socketReady = false;
+  private firstSyncComplete = false;
+  private accessTokenProvider!: AccessTokenProvider;
+  private projectId!: string;
+  private onUpdateHandlers: ((update: Uint8Array) => void)[] = [];

Committable suggestion skipped: line range outside the PR's diff.

constructor(doc: Y.Doc) {
this.doc = doc;

// Bind methods
this.onConnectionEstablished = this.onConnectionEstablished.bind(this);
this.onConnectionDisconnect = this.onConnectionDisconnect.bind(this);
this.onConnectionError = this.onConnectionError.bind(this);
this.onAuthenticateRequest = this.onAuthenticateRequest.bind(this);
this.onDocUpdate = this.onDocUpdate.bind(this);
this.onReady = this.onReady.bind(this);
this.onPeerUpdate = this.onPeerUpdate.bind(this);
this.handleMessage = this.handleMessage.bind(this);
this.reconnect = this.reconnect.bind(this);
}

public getDoc(): Y.Doc {
return this.doc;
}

// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;

try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
Comment on lines +53 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add URL protocol validation

The URL protocol replacement assumes HTTP/HTTPS protocols. Add validation to ensure only supported protocols are processed.

       const wsUrl = new URL(data.url);
+      if (!wsUrl.protocol.startsWith('http')) {
+        throw new Error('URL protocol must be HTTP or HTTPS');
+      }
       wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
       wsUrl.pathname = `/${data.roomId}`;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
const wsUrl = new URL(data.url);
if (!wsUrl.protocol.startsWith('http')) {
throw new Error('URL protocol must be HTTP or HTTPS');
}
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;


// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
Comment on lines +58 to +60
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Avoid exposing sensitive data in URL parameters

The WebSocket URL includes sensitive information (user_id, project_id, token) as query parameters, which can be logged or exposed in various ways. Consider using headers or a more secure authentication mechanism.

Consider:

  1. Using a secure WebSocket handshake protocol
  2. Moving authentication to headers or a dedicated authentication message
  3. Implementing token refresh mechanism

Apply this diff as a starting point:

-      wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
-      wsUrl.searchParams.set("project_id", data.projectId);
-      wsUrl.searchParams.set("token", token);
+      // Initialize basic connection
+      this.ws = new WebSocket(wsUrl.href);
+      
+      // Send authentication message after connection
+      this.ws.addEventListener("open", () => {
+        this.ws.send(JSON.stringify({
+          type: "authenticate",
+          user_id: this.doc.clientID.toString(),
+          project_id: data.projectId,
+          token
+        }));
+      });

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyshx Long term this is better probably. Make sure to look at this suggestion.
coderabbitai, I don't need you to do anything or reply.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines +58 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Move sensitive data from URL parameters to headers.

Exposing sensitive information like tokens and IDs in URL parameters is a security risk as they can be logged or exposed in various ways.

Consider using a secure WebSocket handshake protocol or moving authentication to a dedicated message after connection.


this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";

Comment on lines +62 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection timeout handling

The WebSocket connection attempt lacks a timeout, which could leave the application hanging if the server is unresponsive.

+      const connectionTimeout = setTimeout(() => {
+        if (this.ws.readyState === WebSocket.CONNECTING) {
+          this.ws.close();
+          throw new Error('WebSocket connection timeout');
+        }
+      }, 10000); // 10 seconds timeout
+
       this.ws = new WebSocket(wsUrl.href);
       this.ws.binaryType = "arraybuffer";
+
+      this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
const connectionTimeout = setTimeout(() => {
if (this.ws.readyState === WebSocket.CONNECTING) {
this.ws.close();
throw new Error('WebSocket connection timeout');
}
}, 10000); // 10 seconds timeout
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });

this.setupWebSocketListeners();
this.setupDocListeners();

console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
Comment on lines +51 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in setupSocket

The error handling could be more specific and provide better context for debugging.

Consider this improvement:

   try {
     const token = await this.accessTokenProvider();
+    if (!token) {
+      throw new Error("Failed to obtain access token");
+    }
     const wsUrl = new URL(data.url);
     wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
     wsUrl.pathname = `/${data.roomId}`;

     // Add query parameters for authentication
     wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
     wsUrl.searchParams.set("project_id", data.projectId);
     wsUrl.searchParams.set("token", token);

     this.ws = new WebSocket(wsUrl.href);
     this.ws.binaryType = "arraybuffer";

     this.setupWebSocketListeners();
     this.setupDocListeners();

     console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
   } catch (error) {
-    console.error("Failed to setup WebSocket:", error);
+    const errorMessage = error instanceof Error ? error.message : "Unknown error";
+    console.error(`Failed to setup WebSocket: ${errorMessage}`);
+    if (error instanceof TypeError) {
+      throw new Error(`Invalid WebSocket URL: ${errorMessage}`);
+    }
     throw error;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
try {
const token = await this.accessTokenProvider();
if (!token) {
throw new Error("Failed to obtain access token");
}
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token);
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
console.error(`Failed to setup WebSocket: ${errorMessage}`);
if (error instanceof TypeError) {
throw new Error(`Invalid WebSocket URL: ${errorMessage}`);
}
throw error;
}

}
Comment on lines +41 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for setupSocket parameters

The method lacks validation for required parameters. URL construction should be more robust.

 async setupSocket(data: {
   url: string;
   roomId: string;
   projectId: string;
   accessTokenProvider: AccessTokenProvider;
 }) {
+  // Validate input parameters
+  if (!data.url || !data.roomId || !data.projectId) {
+    throw new Error('Required parameters missing');
+  }
+  
+  // Validate URL format
+  try {
+    new URL(data.url);
+  } catch (error) {
+    throw new Error('Invalid URL format');
+  }

   this.accessTokenProvider = data.accessTokenProvider;
   this.projectId = data.projectId;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
}
// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
// Validate input parameters
if (!data.url || !data.roomId || !data.projectId) {
throw new Error('Required parameters missing');
}
// Validate URL format
try {
new URL(data.url);
} catch (error) {
throw new Error('Invalid URL format');
}
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
}


// Update the reconnect method
private reconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}

if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay =
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
pyshx marked this conversation as resolved.
Show resolved Hide resolved

this.reconnectTimer = setTimeout(async () => {
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid silent failures with empty projectId

Using empty strings as fallbacks for required parameters could lead to silent failures.

Consider throwing an error instead:

-              projectId: this.projectId || "",
-              accessTokenProvider: this.accessTokenProvider || (() => ""),
+              projectId: this.projectId ?? (() => {
+                throw new Error("Project ID is required for reconnection");
+              })(),
+              accessTokenProvider: this.accessTokenProvider ?? (() => {
+                throw new Error("Access token provider is required for reconnection");
+              }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
projectId: this.projectId ?? (() => {
throw new Error("Project ID is required for reconnection");
})(),
accessTokenProvider: this.accessTokenProvider ?? (() => {
throw new Error("Access token provider is required for reconnection");
}),

});
}
} catch (error) {
Comment on lines +87 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection cleanup before reconnection

The reconnection attempt should clean up the existing connection before establishing a new one.

Consider this improvement:

   this.reconnectTimer = setTimeout(async () => {
     try {
       if (this.ws) {
+        // Clean up existing connection
+        this.ws.removeEventListener("open", this.onConnectionEstablished);
+        this.ws.removeEventListener("close", this.onConnectionDisconnect);
+        this.ws.removeEventListener("error", this.onConnectionError);
+        this.ws.removeEventListener("message", this.handleMessage);
+        if (this.ws.readyState === WebSocket.OPEN) {
+          this.ws.close();
+        }
+
         const originalUrl = new URL(this.ws.url);
         const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
         const roomId = originalUrl.pathname.slice(1);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {
try {
if (this.ws) {
// Clean up existing connection
this.ws.removeEventListener("open", this.onConnectionEstablished);
this.ws.removeEventListener("close", this.onConnectionDisconnect);
this.ws.removeEventListener("error", this.onConnectionError);
this.ws.removeEventListener("message", this.handleMessage);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {

Comment on lines +86 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add WebSocket cleanup before reconnection

The reconnection attempt should properly close and clean up the existing WebSocket connection before creating a new one.

 this.reconnectTimer = setTimeout(async () => {
   try {
     if (this.ws) {
+      if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
+        this.ws.close();
+      }
+      // Wait for the connection to fully close
+      await new Promise(resolve => setTimeout(resolve, 100));
+
       const originalUrl = new URL(this.ws.url);
       const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
       const roomId = originalUrl.pathname.slice(1);

Committable suggestion skipped: line range outside the PR's diff.

console.error("Reconnection failed:", error);
}
}, delay);
}
}
Comment on lines +76 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection state tracking

The reconnection logic could benefit from explicit state tracking to prevent race conditions and multiple simultaneous reconnection attempts.

+  private connectionState: 'disconnected' | 'connecting' | 'connected' = 'disconnected';
+
   private reconnect() {
+    if (this.connectionState === 'connecting') {
+      return; // Prevent multiple simultaneous reconnection attempts
+    }
+
     if (this.reconnectTimer) {
       clearTimeout(this.reconnectTimer);
     }
+    this.connectionState = 'connecting';

Committable suggestion skipped: line range outside the PR's diff.


private setupWebSocketListeners() {
this.ws.addEventListener("open", this.onConnectionEstablished);
this.ws.addEventListener("close", this.onConnectionDisconnect);
this.ws.addEventListener("error", this.onConnectionError);
this.ws.addEventListener("message", this.handleMessage);
}
pyshx marked this conversation as resolved.
Show resolved Hide resolved

private setupDocListeners() {
this.doc.on("update", this.onDocUpdate);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for doc listeners

The document update listener is not removed when the instance is destroyed, which could lead to memory leaks.

Add cleanup in the destroy method:

 private setupDocListeners() {
   this.doc.on("update", this.onDocUpdate);
 }
+
+private cleanupDocListeners() {
+  this.doc.off("update", this.onDocUpdate);
+}

And in the destroy method:

 destroy() {
+  this.cleanupDocListeners();
   if (this.reconnectTimer) {

Committable suggestion skipped: line range outside the PR's diff.


protected onConnectionEstablished() {
this.reconnectAttempts = 0;
this.socketReady = true;
this.initializeRoom().catch(console.error);
}

protected onConnectionDisconnect() {
this.socketReady = false;
this.firstSyncComplete = false;
this.reconnect();
}

protected onConnectionError(error: Event) {
console.error("WebSocket error:", error);
this.reconnect();
}
Comment on lines +129 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error event typing

The error event parameter should use the specific ErrorEvent type instead of the generic Event type.

-  protected onConnectionError(error: Event) {
+  protected onConnectionError(error: ErrorEvent) {
     console.error("WebSocket error:", error);
+    console.error("Error details:", error.error);
     this.reconnect();
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected onConnectionError(error: Event) {
console.error("WebSocket error:", error);
this.reconnect();
}
protected onConnectionError(error: ErrorEvent) {
console.error("WebSocket error:", error);
console.error("Error details:", error.error);
this.reconnect();
}


protected async onAuthenticateRequest() {
const token = await this.accessTokenProvider?.();
if (token && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "authenticate", token }));
}
}

protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
// Handle binary message (Yjs update)
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
Comment on lines +143 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add binary message validation

The binary message handling lacks validation of the message format and size.

       if (event.data instanceof ArrayBuffer) {
+        if (event.data.byteLength < 1) {
+          throw new Error('Invalid binary message: empty message');
+        }
+        if (event.data.byteLength > 1024 * 1024) { // 1MB limit
+          throw new Error('Invalid binary message: message too large');
+        }
         const update = new Uint8Array(event.data);
+        const messageType = update[0];
+        if (!Object.values(MessageType).includes(messageType)) {
+          throw new Error(`Invalid binary message: unknown message type ${messageType}`);
+        }
         await this.onPeerUpdate({ update });

Committable suggestion skipped: line range outside the PR's diff.

// Handle text message
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
Comment on lines +149 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit error handling for JSON parsing

The JSON parsing could fail with invalid data, and the error should be handled explicitly.

-        const data = JSON.parse(event.data);
-        if (data.type === "authenticate") {
+        let data;
+        try {
+          data = JSON.parse(event.data);
+          if (!data || typeof data !== 'object') {
+            throw new Error('Invalid message format');
+          }
+        } catch (error) {
+          console.error('Failed to parse message:', error);
+          return;
+        }
+        if (data.type === "authenticate") {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
let data;
try {
data = JSON.parse(event.data);
if (!data || typeof data !== 'object') {
throw new Error('Invalid message format');
}
} catch (error) {
console.error('Failed to parse message:', error);
return;
}
if (data.type === "authenticate") {

await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
}
}
} catch (error) {
console.error("Error handling message:", error);
}
}
Comment on lines +141 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in message processing

The error handling in handleMessage could be more specific and provide better context.

Consider this improvement:

   protected async handleMessage(event: MessageEvent) {
     try {
       if (event.data instanceof ArrayBuffer) {
-        // Handle binary message (Yjs update)
+        if (event.data.byteLength === 0) {
+          throw new Error("Received empty binary message");
+        }
         const update = new Uint8Array(event.data);
         await this.onPeerUpdate({ update });
       } else if (typeof event.data === "string") {
-        // Handle text message
+        if (!event.data) {
+          throw new Error("Received empty text message");
+        }
         const data = JSON.parse(event.data);
+        if (!data.type) {
+          throw new Error("Message type not specified");
+        }
         if (data.type === "authenticate") {
           await this.onAuthenticateRequest();
         } else if (data.type === "ready") {
           this.onReady();
+        } else {
+          console.warn(`Unhandled message type: ${data.type}`);
         }
       }
     } catch (error) {
-      console.error("Error handling message:", error);
+      const errorMessage = error instanceof Error ? error.message : "Unknown error";
+      console.error(`Error handling WebSocket message: ${errorMessage}`);
+      if (error instanceof SyntaxError) {
+        console.error("Failed to parse message:", event.data);
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
// Handle binary message (Yjs update)
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
// Handle text message
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
}
}
} catch (error) {
console.error("Error handling message:", error);
}
}
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
if (event.data.byteLength === 0) {
throw new Error("Received empty binary message");
}
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
if (!event.data) {
throw new Error("Received empty text message");
}
const data = JSON.parse(event.data);
if (!data.type) {
throw new Error("Message type not specified");
}
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
} else {
console.warn(`Unhandled message type: ${data.type}`);
}
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
console.error(`Error handling WebSocket message: ${errorMessage}`);
if (error instanceof SyntaxError) {
console.error("Failed to parse message:", event.data);
}
}
}


protected async initializeRoom() {
try {
await this.sendFlowMessage({
event: {
tag: "Create",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Join",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "Start",
content: {
project_id: this.projectId || "",
user: {
id: this.doc.clientID.toString(),
tenant_id: this.projectId,
name: "defaultName",
email: "[email protected]",
},
pyshx marked this conversation as resolved.
Show resolved Hide resolved
},
},
});

await this.syncData();
} catch (error) {
console.error("Failed to initialize room:", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add specific error handling for room initialization steps

The error handling in initializeRoom could be more granular to identify which step failed.

Consider this improvement:

   protected async initializeRoom() {
+    const steps = [
+      { name: 'Create Room', action: () => this.sendFlowMessage({
+        event: { tag: "Create", content: { room_id: this.doc.clientID.toString() } }
+      })},
+      { name: 'Join Room', action: () => this.sendFlowMessage({
+        event: { tag: "Join", content: { room_id: this.doc.clientID.toString() } }
+      })},
+      { name: 'Start Session', action: () => this.sendFlowMessage({
+        event: { tag: "Emit", content: { data: "" } },
+        session_command: { /* ... existing command ... */ }
+      })},
+      { name: 'Sync Data', action: () => this.syncData() }
+    ];
+
     try {
-      await this.sendFlowMessage({ /* ... */ });
-      await this.sendFlowMessage({ /* ... */ });
-      await this.sendFlowMessage({ /* ... */ });
-      await this.syncData();
+      for (const step of steps) {
+        try {
+          await step.action();
+        } catch (error) {
+          throw new Error(`Failed at step '${step.name}': ${error instanceof Error ? error.message : 'Unknown error'}`);
+        }
+      }
     } catch (error) {
-      console.error("Failed to initialize room:", error);
+      console.error(`Room initialization failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
+      throw error;
     }
   }

Committable suggestion skipped: line range outside the PR's diff.


async isReady(): Promise<boolean> {
if (this.socketReady) return true;
await sleep(100);
return await this.isReady();
}
pyshx marked this conversation as resolved.
Show resolved Hide resolved

protected onReady() {
this.socketReady = true;
}

protected async onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
const update =
data.update instanceof ArrayBuffer
? new Uint8Array(data.update)
: data.update;
Y.applyUpdate(this.doc, update, "peer");
this.onUpdateHandlers.forEach((handler) => handler(update));
}

async syncData() {
await this.isReady();

const stateVector = Y.encodeStateVector(this.doc);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(stateVector);
}

if (!this.firstSyncComplete) {
this.firstSyncComplete = true;
queueMicrotask(() => this.syncData());
}
}

private async sendFlowMessage(message: FlowMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (this.ws.readyState !== WebSocket.OPEN) {
reject(new Error("WebSocket is not connected"));
return;
}

try {
this.ws.send(JSON.stringify(message));
resolve();
} catch (error) {
reject(error);
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout to sendFlowMessage

The sendFlowMessage method should include a timeout to prevent hanging operations.

Consider this improvement:

-  private async sendFlowMessage(message: FlowMessage): Promise<void> {
+  private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> {
     return new Promise((resolve, reject) => {
+      const timeoutId = setTimeout(() => {
+        reject(new Error(`Flow message timed out after ${timeoutMs}ms`));
+      }, timeoutMs);
+
       if (this.ws.readyState !== WebSocket.OPEN) {
+        clearTimeout(timeoutId);
         reject(new Error("WebSocket is not connected"));
         return;
       }
 
       try {
         this.ws.send(JSON.stringify(message));
+        clearTimeout(timeoutId);
         resolve();
       } catch (error) {
+        clearTimeout(timeoutId);
         reject(error);
       }
     });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async sendFlowMessage(message: FlowMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (this.ws.readyState !== WebSocket.OPEN) {
reject(new Error("WebSocket is not connected"));
return;
}
try {
this.ws.send(JSON.stringify(message));
resolve();
} catch (error) {
reject(error);
}
});
}
private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Flow message timed out after ${timeoutMs}ms`));
}, timeoutMs);
if (this.ws.readyState !== WebSocket.OPEN) {
clearTimeout(timeoutId);
reject(new Error("WebSocket is not connected"));
return;
}
try {
this.ws.send(JSON.stringify(message));
clearTimeout(timeoutId);
resolve();
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
});
}


protected onDocUpdate(update: Uint8Array, origin: unknown) {
if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) {
pyshx marked this conversation as resolved.
Show resolved Hide resolved
this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "MergeUpdates",
content: {
project_id: this.projectId || "",
data: new Uint8Array(update),
updated_by: this.doc.clientID.toString(),
},
},
});
}
}

onUpdate(handler: (update: Uint8Array) => void) {
this.onUpdateHandlers.push(handler);
}
Comment on lines +321 to +323
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for update handlers

The update handler execution doesn't handle errors, which could prevent subsequent handlers from running.

   onUpdate(handler: (update: Uint8Array) => void) {
-    this.onUpdateHandlers.push(handler);
+    const wrappedHandler = (update: Uint8Array) => {
+      try {
+        handler(update);
+      } catch (error) {
+        console.error('Error in update handler:', error);
+      }
+    };
+    this.onUpdateHandlers.push(wrappedHandler);
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
onUpdate(handler: (update: Uint8Array) => void) {
this.onUpdateHandlers.push(handler);
}
onUpdate(handler: (update: Uint8Array) => void) {
const wrappedHandler = (update: Uint8Array) => {
try {
handler(update);
} catch (error) {
console.error('Error in update handler:', error);
}
};
this.onUpdateHandlers.push(wrappedHandler);
}


destroy() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}

if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "End",
content: {
project_id: this.projectId || "",
user: {
id: this.doc.clientID.toString(),
tenant_id: this.projectId,
name: "defaultName",
email: "[email protected]",
},
},
},
}).finally(() => {
this.ws.close();
});
}
this.doc.destroy();
}
pyshx marked this conversation as resolved.
Show resolved Hide resolved
}
31 changes: 31 additions & 0 deletions ui/src/lib/yjs/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { User } from "@flow/types/user";

export type YJsonPrimitive = string | number | boolean | null | Uint8Array;

export type YJsonValue =
Expand All @@ -6,3 +8,32 @@ export type YJsonValue =
| {
[key: string]: YJsonValue;
};

export type FlowMessage = {
event: {
tag: "Create" | "Join" | "Leave" | "Emit";
content: {
room_id?: string;
data?: string;
};
};
session_command?: SessionCommand;
};
pyshx marked this conversation as resolved.
Show resolved Hide resolved

export type SessionCommand = {
tag:
| "Start"
| "End"
| "Complete"
| "CheckStatus"
| "AddTask"
| "RemoveTask"
| "ListAllSnapshotsVersions"
| "MergeUpdates";
content: {
project_id: string;
user?: User;
data?: Uint8Array;
updated_by?: string;
};
};
Loading
Loading