Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ui): socket yjs manager & refactor #649

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

Conversation

pyshx
Copy link
Contributor

@pyshx pyshx commented Nov 23, 2024

Overview

There's a couple of reason why we had to choose this custom-built to work with our specific Rust backend architecture.
Integrates with our session management system

It has

  • Custom authentication handling
  • Project-specific room management
  • we control over sync strategies that match our backend's expectations
  • error handling tailored to our needs

What I've done

I've added socketYjsManager to handle event based connection with the websocket backend.

What I haven't done

I haven't implemented Awareness on this implementation and There's a few more Event from the backend that hasn't been added just cause they aren't needed immediately.

How I tested

Once this PR is merged, you can run the websocket server in your local and try to play with collab editing.

Screenshot

Which point I want you to review particularly

socketYjsManager.ts

Memo

Summary by CodeRabbit

  • New Features

    • Added localization support for "Connection Error" in English, Spanish, French, Japanese, and Chinese.
    • Introduced new keys for various UI elements in Spanish, French, and Chinese, enhancing user interface messaging.
    • Added a new utility function to manage asynchronous delays.
  • Improvements

    • Enhanced WebSocket management for real-time collaboration with improved connection handling and document synchronization.
    • Streamlined Yjs document initialization and workflow management for better modularity.
  • Bug Fixes

    • Improved error handling for connection issues and message processing in WebSocket management.
  • Type Definitions

    • Expanded the User type with a new optional tenant_id property for more detailed user information.

Copy link
Contributor

coderabbitai bot commented Nov 23, 2024

Walkthrough

The changes in this pull request involve updates to several localization files for multiple languages, including English, Spanish, French, Japanese, and Chinese. New keys have been added to these files to support localization for various user interface messages, particularly focusing on connection errors and other UI elements. Additionally, a new utility function for handling delays has been introduced, along with significant enhancements to the WebSocket management for real-time collaboration, including a new class and updated types.

Changes

File Path Change Summary
ui/src/lib/i18n/locales/en.json Added new entry: "Connection Error": ""
ui/src/lib/i18n/locales/es.json Added new entries: "Connection Error": "", "3D visualization is not supported in this browser": "", "Choose action": "", "Subworkflow Node": "", "Manage Projects": "", "Ran by:": "", "Trigger:": ""
ui/src/lib/i18n/locales/fr.json Added new entries: "Connection Error": "", "Schema not found": "", "3D visualization is not supported in this browser": "", "Subworkflow Node": "", "Manage Projects": "", "Trigger1": ""
ui/src/lib/i18n/locales/ja.json Added new entry: "Connection Error": ""
ui/src/lib/i18n/locales/zh.json Added new entries: "Connection Error": "", "Schema not found": "", "Submit": "", "3D visualization is not supported in this browser": "", "Manage Projects": "", "Trigger1": ""
ui/src/lib/utils.ts Added function: export function sleep(ms: number)
ui/src/lib/yjs/socketYjsManager.ts Introduced SocketYjsManager class with methods for socket management, message handling, and document synchronization. Added types and enums.
ui/src/lib/yjs/types.ts Added types: FlowMessage, SessionCommand and enum: MessageType
ui/src/lib/yjs/useYjsStore.ts Refactored to use SocketYjsManager; updated initialization and handling of Yjs document and workflows.
ui/src/types/user.ts Added optional property: tenant_id?: string to User type

Possibly related PRs

Suggested reviewers

  • KaWaite

🐇 In the world of code, we hop and play,
New messages added, brightening the day!
From English to Spanish, and French too,
Connection Error's here, just for you!
With websockets dancing, our work's a delight,
Let's celebrate changes, all day and night! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

netlify bot commented Nov 23, 2024

Deploy Preview for reearth-flow failed.

Name Link
🔨 Latest commit 3940df9
🔍 Latest deploy log https://app.netlify.com/sites/reearth-flow/deploys/674741a9921c3c0008a99f22

@pyshx pyshx marked this pull request as ready for review November 24, 2024 00:06
@pyshx pyshx requested a review from KaWaite as a code owner November 24, 2024 00:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Outside diff range and nitpick comments (10)
ui/src/lib/utils.ts (1)

9-11: LGTM! Consider adding input validation and documentation.

The sleep function implementation is clean and follows standard practices. However, consider these improvements:

+/**
+ * Delays execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep
+ * @throws {Error} If ms is negative
+ */
 export function sleep(ms: number) {
+  if (ms < 0) throw new Error("Sleep duration cannot be negative");
   return new Promise((resolve) => setTimeout(resolve, ms));
 }
ui/src/lib/yjs/types.ts (1)

23-39: Add JSDoc comments to document command purposes and required fields

The SessionCommand type includes various commands that might have different requirements and purposes. Consider adding documentation to clarify their usage.

Example documentation:

/**
 * Commands for managing collaborative sessions.
 * @property tag - The type of command to execute
 * @property content - Command-specific data
 */
export type SessionCommand = {
  tag:
    | "Start"     // Initiates a new session
    | "End"       // Terminates an active session
    | "Complete"  // Marks a session as completed
    // ... document other commands

Consider strengthening type safety for specific commands

Different commands might require different fields in their content. Consider using discriminated unions to enforce these requirements.

Example refactor:

type BaseContent = {
  project_id: string;
};

type StartContent = BaseContent & {
  user: User;  // Required for session start
};

export type SessionCommand = {
  tag: "Start";
  content: StartContent;
} | {
  tag: "MergeUpdates";
  content: BaseContent & {
    data: Uint8Array;  // Required for updates
    updated_by: string;  // Required for tracking
  };
} | {
  tag: "End" | "Complete" | "CheckStatus" | "AddTask" | "RemoveTask" | "ListAllSnapshotsVersions";
  content: BaseContent & {
    user?: User;
    data?: Uint8Array;
    updated_by?: string;
  };
};
ui/src/lib/i18n/locales/zh.json (2)

Line range hint 6-6: Add missing Chinese translations for UI elements.

The following keys have empty translations which will result in English text being shown to Chinese users:

  • "Schema not found"
  • "Submit"
  • "3D visualization is not supported in this browser"
  • "2D"
  • "3D"
  • "Manage Projects"
  • "Connection Error"

Please provide appropriate Chinese translations for these UI elements to ensure a consistent user experience.

Also applies to: 7-7, 11-11, 12-12, 13-13, 81-81, 228-228


Line range hint 124-124: Review the "Trigger1" key naming.

The key "Trigger1" appears to be a development placeholder. Consider using a more descriptive name that reflects its purpose (e.g., "TriggerManual", "TriggerScheduled", etc.).

ui/src/lib/i18n/locales/es.json (1)

Line range hint 11-11: Add missing Spanish translations for UI elements

Several new UI strings have been added without Spanish translations. This could impact the user experience for Spanish-speaking users.

Here are suggested translations for review:

-  "3D visualization is not supported in this browser": "",
+  "3D visualization is not supported in this browser": "La visualización 3D no es compatible con este navegador",
-  "2D": "",
+  "2D": "2D",
-  "3D": "",
+  "3D": "3D",
-  "Choose action": "",
+  "Choose action": "Elegir acción",
-  "Subworkflow Node": "",
+  "Subworkflow Node": "Nodo de subflujo de trabajo",
-  "Connection Error": "",
+  "Connection Error": "Error de conexión"

Also applies to: 12-13, 21-21, 39-39, 76-76, 228-228

ui/src/lib/i18n/locales/fr.json (4)

Line range hint 6-6: Add missing French translations for new keys

The following keys are missing their French translations:

  • "Schema not found" → Suggest: "Schéma non trouvé"
  • "3D visualization is not supported in this browser" → Suggest: "La visualisation 3D n'est pas prise en charge dans ce navigateur"
  • "Subworkflow Node" → Suggest: "Nœud de sous-flux"
  • "Manage Projects" → Suggest: "Gérer les projets"
  • "Trigger1" → Suggest: "Déclencheur1"
  • "Connection Error" → Suggest: "Erreur de connexion"

Apply this diff to add the missing translations:

-  "Schema not found": "",
+  "Schema not found": "Schéma non trouvé",
-  "3D visualization is not supported in this browser": "",
+  "3D visualization is not supported in this browser": "La visualisation 3D n'est pas prise en charge dans ce navigateur",
-  "Subworkflow Node": "",
+  "Subworkflow Node": "Nœud de sous-flux",
-  "Manage Projects": "",
+  "Manage Projects": "Gérer les projets",
-  "Trigger1": "",
+  "Trigger1": "Déclencheur1",
-  "Connection Error": "",
+  "Connection Error": "Erreur de connexion",

Also applies to: 11-11, 19-19, 73-73, 147-147, 228-228


Line range hint 12-13: Maintain consistent translation style for dimension labels

The "2D" and "3D" keys are left empty, but since they are standard technical terms, they should remain as is in French.

Apply this diff:

-  "2D": "",
-  "3D": "",
+  "2D": "2D",
+  "3D": "3D",

Line range hint 52-52: Add missing translation for "General shortcuts"

The "General shortcuts" key is missing its French translation while surrounding keys are properly translated.

Apply this diff:

-  "General shortcuts": "",
+  "General shortcuts": "Raccourcis généraux",

Missing translations detected across language files

Several keys are missing translations in multiple language files:

  • Only Japanese (ja.json) has translations for most of the new keys
  • Spanish (es.json) only has "Schema not found" translated
  • English (en.json), French (fr.json), and Chinese (zh.json) are missing translations for all the checked keys:
    • "Schema not found"
    • "3D visualization is not supported in this browser"
    • "Subworkflow Node"
    • "Manage Projects"
    • "Trigger1"
    • "Connection Error"
🔗 Analysis chain

Line range hint 1-232: Verify translations across all language files

The changes to this French localization file are part of a broader update across multiple language files. Ensure that all new keys are consistently added and translated across all supported languages.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for consistency of new keys across all language files

# Find all locale files
echo "Checking locale files for consistency..."
for locale in ui/src/lib/i18n/locales/*.json; do
  echo "=== $(basename "$locale") ==="
  # Check for empty translations of the new keys
  jq -r '
    .["Schema not found"],
    .["3D visualization is not supported in this browser"],
    .["Subworkflow Node"],
    .["Manage Projects"],
    .["Trigger1"],
    .["Connection Error"]
  ' "$locale"
done

Length of output: 2020

ui/src/lib/yjs/socketYjsManager.ts (1)

274-303: Ensure proper cleanup of event listeners in 'destroy' method

To prevent potential memory leaks and ensure proper resource management, consider removing event listeners from both the WebSocket and the Yjs document when destroying the manager.

Apply this diff to add event listener removal:

  destroy() {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
    }

+   // Remove WebSocket event listeners
+   if (this.ws) {
+     this.ws.removeEventListener("open", this.onConnectionEstablished);
+     this.ws.removeEventListener("close", this.onConnectionDisconnect);
+     this.ws.removeEventListener("error", this.onConnectionError);
+     this.ws.removeEventListener("message", this.handleMessage);
+   }

+   // Remove Y.Doc event listeners
+   this.doc.off("update", this.onDocUpdate);

    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.sendFlowMessage({
        event: {
          tag: "Emit",
          content: { data: "" },
        },
        session_command: {
          tag: "End",
          content: {
            project_id: this.projectId || "",
            user: {
              id: this.doc.clientID.toString(),
              tenant_id: this.projectId,
              name: "defaultName",
              email: "[email protected]",
            },
          },
        },
      }).finally(() => {
        this.ws.close();
      });
    }
    this.doc.destroy();
  }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between fcf3755 and e855eb4.

📒 Files selected for processing (10)
  • ui/src/lib/i18n/locales/en.json (1 hunks)
  • ui/src/lib/i18n/locales/es.json (1 hunks)
  • ui/src/lib/i18n/locales/fr.json (1 hunks)
  • ui/src/lib/i18n/locales/ja.json (1 hunks)
  • ui/src/lib/i18n/locales/zh.json (1 hunks)
  • ui/src/lib/utils.ts (1 hunks)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
  • ui/src/lib/yjs/types.ts (2 hunks)
  • ui/src/lib/yjs/useYjsStore.ts (5 hunks)
  • ui/src/types/user.ts (1 hunks)
🔇 Additional comments (5)
ui/src/lib/utils.ts (1)

9-11: Verify usage context in socket manager.

Since this is part of the socket Yjs manager changes, let's verify how this utility is being used to ensure it's not creating potential performance issues.

✅ Verification successful

Let me gather more context about the sleep usage in the socket manager to understand its purpose better.


Sleep usage in socket manager is appropriate and safe

The sleep(100) usage in the isReady() method is a standard polling mechanism to check socket readiness with a reasonable delay. It's used in conjunction with recursion to wait for the socket connection to be established, which is a common pattern in WebSocket implementations. The delay is small (100ms) and the recursion ends as soon as socketReady becomes true, which happens in the onReady() method when the socket is properly initialized.

Key observations:

  • The sleep is only used during the initial connection phase
  • It's properly wrapped in async/await
  • The recursion has a clear exit condition (socketReady = true)
  • The delay is reasonable for a WebSocket handshake
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for sleep function usage in socket-related files
rg -l "sleep\(" | rg -i "socket|yjs" | xargs rg -A 5 "sleep\("

# Look for any retry mechanisms or timing-sensitive operations
rg -A 10 "new SocketYjsManager|extends SocketYjsManager" 

Length of output: 2404


Script:

#!/bin/bash
# Get the implementation context of isReady method where sleep is used
ast-grep --pattern 'isReady() {
  $$$
}'

# Get the socket manager class implementation to understand the overall flow
ast-grep --pattern 'class SocketYjsManager {
  $$$
}'

Length of output: 19947

ui/src/types/user.ts (1)

18-18: LGTM! The type definition is correct.

The optional tenant_id property is properly typed and maintains backward compatibility with existing code.

ui/src/lib/yjs/types.ts (2)

Line range hint 1-39: Verify integration with SocketYjsManager

These types appear to be used by the new SocketYjsManager. Let's verify the integration points to ensure proper usage.

✅ Verification successful

Types are properly integrated with SocketYjsManager and websocket backend

The verification shows that:

  • FlowMessage and SessionCommand types are correctly imported and used in socketYjsManager.ts
  • The types are properly integrated with the Rust backend in websocket/app/src/handler.rs and related services
  • All command types defined in the TypeScript interface match their Rust counterparts
  • The types are used consistently across the WebSocket communication layer for managing project edit sessions

The integration points are working as expected with no issues found.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check how these types are used in the socket manager
# Expected: Find usage of FlowMessage and SessionCommand in the manager

# Find SocketYjsManager implementation
ast-grep --pattern 'class SocketYjsManager {
  $$$
}'

# Find usage of the types
rg -t typescript "FlowMessage|SessionCommand" -A 5

Length of output: 19992


Script:

#!/bin/bash
# Let's try searching with ripgrep without file type restriction
# and check the usage of FlowMessage and SessionCommand

# Search for FlowMessage usage
rg "FlowMessage" -A 2

# Search for SessionCommand usage
rg "SessionCommand" -A 2

# Search for imports of these types
rg "import.*(?:FlowMessage|SessionCommand)"

Length of output: 11072


1-1: Verify the User type import usage

The User type is imported but only used optionally in SessionCommand. Let's verify if all required User properties are being utilized.

✅ Verification successful

The User type import is correctly used and required

The User type is appropriately imported and used in the SessionCommand type definition where it's needed as an optional property in the content object. The type definition from @flow/types/user matches the usage requirements, containing id, name, email, and optional tenant_id properties which are necessary for user identification in session commands.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check the User type definition and its usage
# Expected: Find the User type definition to understand required properties

# Find User type definition
ast-grep --pattern 'type User = {
  $$$
}'

# Find all usage of User type in the codebase
rg -t typescript "User" -A 5

Length of output: 345


Script:

#!/bin/bash
# Let's try with proper file extensions and check both the User type definition and its usage
# First, find all TypeScript files that mention User
rg "User" --type ts -A 5

# Then specifically look at our file of interest to see how User is used
cat ui/src/lib/yjs/types.ts

Length of output: 59778

ui/src/lib/yjs/useYjsStore.ts (1)

119-126: Ensure managerRef is initialized before using it

There is a possibility that managerRef.current might be null when undoTrackerActionWrapper is called, which would prevent the transaction from occurring. Verify that managerRef.current is properly initialized before any calls to undoTrackerActionWrapper.

@@ -15,6 +15,7 @@ export type User = {
id: string;
name: string;
email: string;
tenant_id?: string;
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

API contract mismatch detected between frontend type and GraphQL schema

The frontend User type in ui/src/types/user.ts includes a tenant_id field, but this field is not present in the GraphQL schema's User type (as seen in the generated types). This mismatch could lead to runtime issues since:

  • The GraphQL User type only includes: id, name, email, and host fields
  • None of the GraphQL queries/mutations handle the tenant_id field
  • The generated types don't include tenant_id in their definitions
🔗 Analysis chain

Verify API contract and data handling.

Since this type likely corresponds to API responses and requests, please ensure:

  1. The backend API contract matches this type definition
  2. Any serialization/deserialization logic properly handles the new tenant_id field

Let's check for API-related files and serialization logic:


Let me gather more information about the GraphQL schema and types to verify the API contract.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for API endpoints and serialization logic related to User type

# Check for API endpoints handling user data
rg -A 5 "user|me" "ui/src/api"

# Check for any serialization/deserialization logic
rg -A 5 "JSON\.(parse|stringify).*User" "ui/src"

# Look for GraphQL queries/mutations related to user
rg -A 5 "query.*User|mutation.*User" "ui/src"

Length of output: 17013


Script:

#!/bin/bash
# Check GraphQL schema and type definitions
rg -A 10 "type User|input.*User|interface.*User" "ui/src"

# Check for tenant_id usage in GraphQL queries/mutations
rg -A 5 "tenant_id" "ui/src/lib/gql"

# Look for generated types related to User
rg -A 5 "export (type|interface) User" "ui/src"

Length of output: 7778

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest refactor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyshx, to address the API contract mismatch between the frontend User type and the GraphQL schema, you have a couple of options:

  1. Update the GraphQL Schema to Include tenant_id:

    If the tenant_id field is needed on the frontend, you should update the GraphQL schema to include this field in the User type. This ensures consistency between the frontend types and the backend API responses.

    Example GraphQL Schema Update:

    type User {
      id: ID!
      name: String!
      email: String!
      host: String
      tenant_id: ID
    }

    Make sure to also update any GraphQL resolvers to handle the tenant_id field appropriately.

  2. Remove tenant_id from the Frontend User Type:

    If the tenant_id is not required on the frontend, you can remove it from the User type definition in ui/src/types/user.ts to match the backend schema.

    Updated ui/src/types/user.ts:

    export type User = {
      id: string;
      name: string;
      email: string;
      // tenant_id?: string; // Remove this line
    };

    Ensure that any code depending on tenant_id is also updated accordingly.


Let me know if you need further assistance with this refactor!

ui/src/lib/yjs/types.ts Outdated Show resolved Hide resolved
ui/src/lib/i18n/locales/en.json Show resolved Hide resolved
ui/src/lib/i18n/locales/ja.json Show resolved Hide resolved
ui/src/lib/i18n/locales/es.json Show resolved Hide resolved
ui/src/lib/yjs/useYjsStore.ts Show resolved Hide resolved
ui/src/lib/yjs/useYjsStore.ts Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Outdated Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
ui/src/lib/yjs/socketYjsManager.ts (4)

235-249: Refactor 'sendFlowMessage' to avoid unnecessary Promise creation

The sendFlowMessage method wraps an async operation inside a new Promise, which is unnecessary and considered an anti-pattern. You can simplify the function by removing the explicit Promise construction and letting the function's async nature handle the returned Promise.

Apply this diff to simplify the function:

-private async sendFlowMessage(message: FlowMessage): Promise<void> {
-  return new Promise((resolve, reject) => {
-    if (this.ws.readyState !== WebSocket.OPEN) {
-      reject(new Error("WebSocket is not connected"));
-      return;
-    }
-
-    try {
-      this.ws.send(JSON.stringify(message));
-      resolve();
-    } catch (error) {
-      reject(error);
-    }
-  });
+private async sendFlowMessage(message: FlowMessage): Promise<void> {
+  if (this.ws.readyState !== WebSocket.OPEN) {
+    throw new Error("WebSocket is not connected");
+  }
+  try {
+    this.ws.send(JSON.stringify(message));
+  } catch (error) {
+    throw error;
+  }
 }

141-159: Handle unexpected 'event.data' types in 'handleMessage'

Currently, the handleMessage method assumes that event.data is either an ArrayBuffer or a string. To enhance robustness, consider adding handling for unexpected data types to prevent silent failures or unhandled exceptions.

Add an else clause to log or handle unexpected data types:

     } else if (typeof event.data === "string") {
       // Handle text message
       const data = JSON.parse(event.data);
       if (data.type === "authenticate") {
         await this.onAuthenticateRequest();
       } else if (data.type === "ready") {
         this.onReady();
+      } else {
+        console.warn(`Unhandled message type received: ${data.type}`);
       }
+    } else {
+      console.warn("Received message with unexpected data type:", typeof event.data);
     }

147-155: Add default case for unrecognized message types

In the handleMessage method, only specific message types are processed. Adding a default case helps in logging or handling any unexpected message types, which is beneficial for debugging and future message implementations.

Consider updating the code to handle unrecognized message types:

     } else if (typeof event.data === "string") {
       // Handle text message
       const data = JSON.parse(event.data);
       if (data.type === "authenticate") {
         await this.onAuthenticateRequest();
       } else if (data.type === "ready") {
         this.onReady();
+      } else {
+        console.warn(`Received unrecognized message type: ${data.type}`);
       }
     }

68-68: Remove console logs or adjust logging level for production

The console log statement revealing the WebSocket connection details might not be suitable for a production environment. It could expose sensitive information or clutter the console output.

Consider removing the console log or using a debug logger:

-    console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
+    // For debugging purposes only:
+    // console.debug("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e855eb4 and a8cbafe.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
🔇 Additional comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)

135-139: Verify that 'accessTokenProvider' is defined before use

The method onAuthenticateRequest uses optional chaining when calling accessTokenProvider, which could lead to token being undefined if the provider is not set. Ensure that accessTokenProvider is always defined when this method is invoked.

Please verify that accessTokenProvider is always provided and cannot be undefined at this point. If it can be undefined, consider adding error handling or default behavior.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Outside diff range and nitpick comments (9)
websocket/crates/infra/src/persistence/redis/updates.rs (2)

125-132: Use appropriate logging levels for error messages

Currently, error conditions are logged using the debug! macro. For better visibility of errors, consider using error! or warn! to ensure that these critical events are properly surfaced in the logs.

Apply this change to adjust the logging level:

- debug!(
+ error!(

Also applies to: 153-162, 175-178


150-164: Refactor duplicate code when applying updates

The code blocks applying redis_update and stream_update are nearly identical. Extracting the shared logic into a helper function will reduce duplication and enhance maintainability.

Apply this diff to refactor the duplicated code:

+fn apply_update(
+    txn: &mut TransactionMut,
+    update_data: &[u8],
+    update_name: &str,
+) -> Result<(), FlowProjectRedisDataManagerError> {
+    if !update_data.is_empty() {
+        debug!("Applying {} of size {}", update_name, update_data.len());
+        match Update::decode_v2(update_data) {
+            Ok(update) => {
+                if let Err(e) = txn.apply_update(update) {
+                    error!("Error applying {}: {:?}", update_name, e);
+                    return Err(FlowProjectRedisDataManagerError::from(e));
+                }
+            }
+            Err(e) => {
+                error!("Error decoding {}: {:?}", update_name, e);
+                return Err(FlowProjectRedisDataManagerError::from(e));
+            }
+        }
+    }
+    Ok(())
+}

- if !redis_update.is_empty() {
-     debug!("Applying redis update of size {}", redis_update.len());
-     match Update::decode_v2(&redis_update) {
-         Ok(update) => {
-             if let Err(e) = txn.apply_update(update) {
-                 debug!("Error applying redis update: {:?}", e);
-                 return Err(FlowProjectRedisDataManagerError::from(e));
-             }
-         }
-         Err(e) => {
-             debug!("Error decoding redis update: {:?}", e);
-             return Err(FlowProjectRedisDataManagerError::from(e));
-         }
-     }
- }
+ apply_update(&mut txn, &redis_update, "redis update")?;

- if !stream_update.is_empty() {
-     debug!("Applying stream update of size {}", stream_update.len());
-     match Update::decode_v2(&stream_update) {
-         Ok(update) => {
-             if let Err(e) = txn.apply_update(update) {
-                 debug!("Error applying stream update: {:?}", e);
-                 return Err(FlowProjectRedisDataManagerError::from(e));
-             }
-         }
-         Err(e) => {
-             debug!("Error decoding stream update: {:?}", e);
-             return Err(FlowProjectRedisDataManagerError::from(e));
-         }
-     }
- }
+ apply_update(&mut txn, &stream_update, "stream update")?;

Also applies to: 166-180

websocket/app/examples/session_client.rs (2)

Line range hint 1-248: Enhance example to better demonstrate integration patterns

While this example effectively shows the WebSocket functionality, consider these improvements to better align with the PR's objectives:

  1. Add configuration management:
struct Config {
    host: String,
    port: u16,
    user_id: String,
    project_id: String,
    room_id: String,
    auth_token: String,
}
  1. Add error handling examples for common WebSocket scenarios:
  • Connection drops
  • Authentication failures
  • Invalid messages
  1. Add comments explaining the integration with the new SocketYjsManager:
// This example demonstrates the server-side handling of WebSocket connections
// that integrate with the frontend SocketYjsManager (see ui/src/lib/yjs/socketYjsManager.ts)

Would you like me to provide a complete example implementation with these improvements?


Session commands are misaligned between frontend and backend

The verification reveals mismatches between the Rust and TypeScript implementations:

  • Frontend SessionCommand type includes additional commands: MergeUpdates
  • Frontend's command structure uses tag and content fields, while backend uses named struct variants
  • Frontend's command content includes optional fields like data and updated_by not present in backend

Required changes:

  • Add MergeUpdates variant to Rust's SessionCommand enum
  • Align the command structure by either:
    • Using tagged unions in Rust to match TypeScript's structure, or
    • Updating TypeScript to use direct field access like Rust
🔗 Analysis chain

Line range hint 13-48: Verify alignment with frontend session commands

The Event and SessionCommand enums must match the protocol expected by the new SocketYjsManager.

Let's verify the alignment:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the session commands align with the frontend implementation

# Search for corresponding TypeScript types/interfaces
rg -t typescript "type (SessionCommand|FlowMessage)" -A 10

# Search for handling of these commands in the socket manager
ast-grep --pattern 'class SocketYjsManager {
  $$$
  handle$_($$$) {
    $$$
  }
  $$$
}'

Length of output: 187


Script:

#!/bin/bash
# Let's try different approaches to find the frontend implementation

# Search for any files containing SocketYjsManager
fd -e ts -e js -e tsx -e jsx . | xargs rg "SocketYjsManager"

# Search for session-related types/interfaces in all possible frontend files
fd -e ts -e js -e tsx -e jsx . | xargs rg "type.*Session|interface.*Session|enum.*Session"

# Look for websocket message handling in frontend
fd -e ts -e js -e tsx -e jsx . | xargs rg "socket\.on|socket\.emit"

# Search for any files that might contain session command definitions
fd -e ts -e js -e tsx -e jsx . | xargs rg "Start|End|Complete|CheckStatus|AddTask|RemoveTask"

Length of output: 4489


Script:

#!/bin/bash
# Let's examine the session command types in the frontend

# Get the complete SessionCommand type definition
cat ui/src/lib/yjs/types.ts

# Check the actual socket message handling in SocketYjsManager
cat ui/src/lib/yjs/socketYjsManager.ts

Length of output: 9751

ui/src/lib/yjs/socketYjsManager.ts (1)

254-274: Implement rate limiting for updates

The update handling lacks rate limiting, which could lead to performance issues with rapid updates.

Consider implementing a debounce mechanism:

+  private updateDebounceTimeout: NodeJS.Timeout | null = null;
+  private readonly DEBOUNCE_DELAY = 50; // ms

   protected onDocUpdate(update: Uint8Array, origin: unknown) {
     if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) {
+      if (this.updateDebounceTimeout) {
+        clearTimeout(this.updateDebounceTimeout);
+      }
+
+      this.updateDebounceTimeout = setTimeout(() => {
         const stateVector = Y.encodeStateVectorFromUpdateV2(update);
         const diffUpdate = Y.diffUpdateV2(update, stateVector);
         
         this.sendFlowMessage({
           // ... existing message structure
         });
+      }, this.DEBOUNCE_DELAY);
     }
   }
websocket/crates/services/src/manage_project_edit_session.rs (4)

283-287: Consider adding backoff strategy for channel errors.

While the current error handling is good, consider implementing an exponential backoff strategy for channel errors to prevent overwhelming the system during issues.

- sleep(Duration::from_secs(1)).await;
+ let backoff = ExponentialBackoff::default();
+ backoff.retry(|| async {
+     // Retry logic here
+ }).await;

169-183: Add error case logging in Complete command.

The Complete command's error handling could be enhanced with explicit error logging.

 if let Some(mut session) = self.get_latest_session(&project_id).await? {
     debug!("Found latest session: {:?}", session);
     let result = self.complete_job_if_met_requirements(&mut session).await;
-    debug!("Job completion result: {:?}", result);
+    match &result {
+        Ok(_) => debug!("Job completion successful"),
+        Err(e) => debug!("Job completion failed: {:?}", e),
+    }
     result?;

Line range hint 13-20: Consider making timeout constants configurable.

The hardcoded timeout values could be made configurable through environment variables or configuration files for better flexibility in different environments.

-const MAX_EMPTY_SESSION_DURATION: Duration = Duration::from_secs(10);
-const JOB_COMPLETION_DELAY: Duration = Duration::from_secs(5);
+lazy_static! {
+    static ref MAX_EMPTY_SESSION_DURATION: Duration = Duration::from_secs(
+        std::env::var("MAX_EMPTY_SESSION_DURATION_SECS")
+            .unwrap_or("10".to_string())
+            .parse()
+            .unwrap_or(10)
+    );
+    static ref JOB_COMPLETION_DELAY: Duration = Duration::from_secs(
+        std::env::var("JOB_COMPLETION_DELAY_SECS")
+            .unwrap_or("5".to_string())
+            .parse()
+            .unwrap_or(5)
+    );
+}

Line range hint 396-421: Consider adding metrics for session management.

The session management logic would benefit from metrics collection to monitor system health and performance.

 pub async fn end_editing_session_if_conditions_met(
     &self,
     session: &mut ProjectEditingSession,
     data: &ManageProjectEditSessionTaskData,
 ) -> Result<(), ProjectServiceError> {
+    let start = std::time::Instant::now();
     if let Some(client_count) = *data.client_count.read().await {
         if client_count == 0 {
             if let Some(clients_disconnected_at) = *data.clients_disconnected_at.read().await {
                 let current_time = Utc::now();
                 let clients_disconnection_elapsed_time = current_time - clients_disconnected_at;
 
                 if clients_disconnection_elapsed_time
                     .to_std()
                     .map_err(ProjectServiceError::ChronoDurationConversionError)?
                     > MAX_EMPTY_SESSION_DURATION
                 {
                     self.project_service
                         .end_session("system".to_string(), session.clone())
                         .await?;
+                    metrics::increment_counter!("session_ended_by_timeout");
                 }
             }
         }
     }
+    metrics::histogram!("session_check_duration_ms", start.elapsed().as_millis() as f64);
     Ok(())
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between a8cbafe and 7a606b0.

📒 Files selected for processing (4)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
  • websocket/app/examples/session_client.rs (1 hunks)
  • websocket/crates/infra/src/persistence/redis/updates.rs (1 hunks)
  • websocket/crates/services/src/manage_project_edit_session.rs (1 hunks)
🔇 Additional comments (3)
websocket/crates/infra/src/persistence/redis/updates.rs (1)

104-109: Enhanced logging improves traceability

The added debug statements at the start of merge_updates_internal provide valuable context for tracing the merge operation.

websocket/crates/services/src/manage_project_edit_session.rs (2)

151-152: LGTM! Comprehensive debug logging implementation.

The debug logging implementation provides excellent visibility into command processing flow, with clear entry and exit points.

Also applies to: 290-290


184-200: 🛠️ Refactor suggestion

Consider adding data validation for MergeUpdates.

The MergeUpdates command should validate the data size before processing to prevent potential memory issues.

 SessionCommand::MergeUpdates {
     project_id,
     data,
     updated_by,
 } => {
+    const MAX_UPDATE_SIZE: usize = 10 * 1024 * 1024; // 10MB
+    if data.len() > MAX_UPDATE_SIZE {
+        debug!("Update size {} exceeds maximum allowed size", data.len());
+        return Err(ProjectServiceError::InvalidInput("Update size too large".into()));
+    }
     debug!(
         "Merging updates for project: {}, updated by: {:?}, data length: {}",
         project_id,
         updated_by,
         data.len()
     );

websocket/crates/infra/src/persistence/redis/updates.rs Outdated Show resolved Hide resolved
websocket/app/examples/session_client.rs Outdated Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Outdated Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Outdated Show resolved Hide resolved
Comment on lines +58 to +60
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
Copy link
Contributor

@coderabbitai coderabbitai bot Nov 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Avoid exposing sensitive data in URL parameters

The WebSocket URL includes sensitive information (user_id, project_id, token) as query parameters, which can be logged or exposed in various ways. Consider using headers or a more secure authentication mechanism.

Consider:

  1. Using a secure WebSocket handshake protocol
  2. Moving authentication to headers or a dedicated authentication message
  3. Implementing token refresh mechanism

Apply this diff as a starting point:

-      wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
-      wsUrl.searchParams.set("project_id", data.projectId);
-      wsUrl.searchParams.set("token", token);
+      // Initialize basic connection
+      this.ws = new WebSocket(wsUrl.href);
+      
+      // Send authentication message after connection
+      this.ws.addEventListener("open", () => {
+        this.ws.send(JSON.stringify({
+          type: "authenticate",
+          user_id: this.doc.clientID.toString(),
+          project_id: data.projectId,
+          token
+        }));
+      });

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyshx Long term this is better probably. Make sure to look at this suggestion.
coderabbitai, I don't need you to do anything or reply.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

ui/src/lib/yjs/socketYjsManager.ts Outdated Show resolved Hide resolved
ui/src/lib/yjs/socketYjsManager.ts Show resolved Hide resolved
Comment on lines +41 to +42
// Initialize with default workflow if empty
if (yWorkflows.length === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary since the doc will always be new, and the useState's initial values (aka this code) will never re-run.

manager
.setupSocket({
url: websocketUrl,
roomId: workflowId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflowId is probably needing to be changed to projectId. Unless we want a room created and used per workflow/subworkflow. @pyshx What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah since we are passing the projectId too, I guess that IS the idea. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But each room will have the same Y.array doc with all workflows anyways, so if anything this will just cause some perf issues around changing between canvas... hmmm

// Create and setup socket manager
if (!doc) return;
const manager = new SocketYjsManager(doc);
manager
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be awaited, right? If not an un-initiated manager will potentially be passed to the undomgr

if (!websocketUrl || !websocketToken || !workflowId || !currentProject?.id)
return;

const doc = managerRef.current?.getDoc() || yWorkflows.doc;
Copy link
Member

@KaWaite KaWaite Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary since the useEffect is already doing a cleanup itself in the return statement. (aka managerRef.current will always be null at this point. If that isn't what you were hoping for, this code needs a refactor a bit probably)

content: {
project_id: this.projectId || "",
data: new Uint8Array(diffUpdate),
updated_by: this.doc.clientID.toString()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used internally only for the websocket server to be able to resolve conflicts between session users? If we want to be able to reference this across sessions, we don't want the clientID, which I believe is auto-generated on each new session connection. We would want the user id probably.

Comment on lines +87 to +90
return () => {
manager.destroy();
undoMngr.destroy();
managerRef.current = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is manager.destroy() necessary if we are setting the managerRef.current to null anyways?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)

14-19: Add type safety to binary message creation

The binary message creation utility could benefit from additional type safety.

Consider this improvement:

-function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+function createBinaryMessage(type: MessageType, data: Uint8Array | ArrayBuffer): Uint8Array {
+  const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data;
   const message = new Uint8Array(data.length + 1);
   message[0] = type;
-  message.set(data, 1);
+  message.set(buffer, 1);
   return message;
}

29-31: Make reconnection parameters configurable

The reconnection parameters are hardcoded. Consider making them configurable through constructor options for better flexibility.

Consider this improvement:

+interface SocketYjsManagerOptions {
+  maxReconnectAttempts?: number;
+  reconnectDelay?: number;
+}

 export class SocketYjsManager {
   protected reconnectAttempts = 0;
-  protected maxReconnectAttempts = 5;
-  protected reconnectDelay = 1000;
+  protected maxReconnectAttempts: number;
+  protected reconnectDelay: number;

-  constructor(doc: Y.Doc) {
+  constructor(doc: Y.Doc, options: SocketYjsManagerOptions = {}) {
     this.doc = doc;
+    this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
+    this.reconnectDelay = options.reconnectDelay ?? 1000;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 7a606b0 and b636507.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)

Comment on lines +99 to +111
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection cleanup before reconnection

The reconnection attempt should clean up the existing connection before establishing a new one.

Consider this improvement:

   this.reconnectTimer = setTimeout(async () => {
     try {
       if (this.ws) {
+        // Clean up existing connection
+        this.ws.removeEventListener("open", this.onConnectionEstablished);
+        this.ws.removeEventListener("close", this.onConnectionDisconnect);
+        this.ws.removeEventListener("error", this.onConnectionError);
+        this.ws.removeEventListener("message", this.handleMessage);
+        if (this.ws.readyState === WebSocket.OPEN) {
+          this.ws.close();
+        }
+
         const originalUrl = new URL(this.ws.url);
         const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
         const roomId = originalUrl.pathname.slice(1);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {
try {
if (this.ws) {
// Clean up existing connection
this.ws.removeEventListener("open", this.onConnectionEstablished);
this.ws.removeEventListener("close", this.onConnectionDisconnect);
this.ws.removeEventListener("error", this.onConnectionError);
this.ws.removeEventListener("message", this.handleMessage);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {

Comment on lines 173 to 212
protected async initializeRoom() {
try {
await this.sendFlowMessage({
event: {
tag: "Create",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Join",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "Start",
content: {
project_id: this.projectId || "",
user: {
id: this.doc.clientID.toString(),
tenant_id: this.projectId,
name: "defaultName",
email: "[email protected]",
},
},
},
});

await this.syncData();
} catch (error) {
console.error("Failed to initialize room:", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add specific error handling for room initialization steps

The error handling in initializeRoom could be more granular to identify which step failed.

Consider this improvement:

   protected async initializeRoom() {
+    const steps = [
+      { name: 'Create Room', action: () => this.sendFlowMessage({
+        event: { tag: "Create", content: { room_id: this.doc.clientID.toString() } }
+      })},
+      { name: 'Join Room', action: () => this.sendFlowMessage({
+        event: { tag: "Join", content: { room_id: this.doc.clientID.toString() } }
+      })},
+      { name: 'Start Session', action: () => this.sendFlowMessage({
+        event: { tag: "Emit", content: { data: "" } },
+        session_command: { /* ... existing command ... */ }
+      })},
+      { name: 'Sync Data', action: () => this.syncData() }
+    ];
+
     try {
-      await this.sendFlowMessage({ /* ... */ });
-      await this.sendFlowMessage({ /* ... */ });
-      await this.sendFlowMessage({ /* ... */ });
-      await this.syncData();
+      for (const step of steps) {
+        try {
+          await step.action();
+        } catch (error) {
+          throw new Error(`Failed at step '${step.name}': ${error instanceof Error ? error.message : 'Unknown error'}`);
+        }
+      }
     } catch (error) {
-      console.error("Failed to initialize room:", error);
+      console.error(`Room initialization failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
+      throw error;
     }
   }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +63 to +84
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;

// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers

this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";

this.setupWebSocketListeners();
this.setupDocListeners();

console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in setupSocket

The error handling could be more specific and provide better context for debugging.

Consider this improvement:

   try {
     const token = await this.accessTokenProvider();
+    if (!token) {
+      throw new Error("Failed to obtain access token");
+    }
     const wsUrl = new URL(data.url);
     wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
     wsUrl.pathname = `/${data.roomId}`;

     // Add query parameters for authentication
     wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
     wsUrl.searchParams.set("project_id", data.projectId);
     wsUrl.searchParams.set("token", token);

     this.ws = new WebSocket(wsUrl.href);
     this.ws.binaryType = "arraybuffer";

     this.setupWebSocketListeners();
     this.setupDocListeners();

     console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
   } catch (error) {
-    console.error("Failed to setup WebSocket:", error);
+    const errorMessage = error instanceof Error ? error.message : "Unknown error";
+    console.error(`Failed to setup WebSocket: ${errorMessage}`);
+    if (error instanceof TypeError) {
+      throw new Error(`Invalid WebSocket URL: ${errorMessage}`);
+    }
     throw error;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
try {
const token = await this.accessTokenProvider();
if (!token) {
throw new Error("Failed to obtain access token");
}
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token);
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
console.error(`Failed to setup WebSocket: ${errorMessage}`);
if (error instanceof TypeError) {
throw new Error(`Invalid WebSocket URL: ${errorMessage}`);
}
throw error;
}

Comment on lines 252 to 266
private async sendFlowMessage(message: FlowMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (this.ws.readyState !== WebSocket.OPEN) {
reject(new Error("WebSocket is not connected"));
return;
}

try {
this.ws.send(JSON.stringify(message));
resolve();
} catch (error) {
reject(error);
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout to sendFlowMessage

The sendFlowMessage method should include a timeout to prevent hanging operations.

Consider this improvement:

-  private async sendFlowMessage(message: FlowMessage): Promise<void> {
+  private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> {
     return new Promise((resolve, reject) => {
+      const timeoutId = setTimeout(() => {
+        reject(new Error(`Flow message timed out after ${timeoutMs}ms`));
+      }, timeoutMs);
+
       if (this.ws.readyState !== WebSocket.OPEN) {
+        clearTimeout(timeoutId);
         reject(new Error("WebSocket is not connected"));
         return;
       }
 
       try {
         this.ws.send(JSON.stringify(message));
+        clearTimeout(timeoutId);
         resolve();
       } catch (error) {
+        clearTimeout(timeoutId);
         reject(error);
       }
     });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async sendFlowMessage(message: FlowMessage): Promise<void> {
return new Promise((resolve, reject) => {
if (this.ws.readyState !== WebSocket.OPEN) {
reject(new Error("WebSocket is not connected"));
return;
}
try {
this.ws.send(JSON.stringify(message));
resolve();
} catch (error) {
reject(error);
}
});
}
private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Flow message timed out after ${timeoutMs}ms`));
}, timeoutMs);
if (this.ws.readyState !== WebSocket.OPEN) {
clearTimeout(timeoutId);
reject(new Error("WebSocket is not connected"));
return;
}
try {
this.ws.send(JSON.stringify(message));
clearTimeout(timeoutId);
resolve();
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
});
}

Comment on lines +153 to +171
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
// Handle binary message (Yjs update)
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
// Handle text message
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
}
}
} catch (error) {
console.error("Error handling message:", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in message processing

The error handling in handleMessage could be more specific and provide better context.

Consider this improvement:

   protected async handleMessage(event: MessageEvent) {
     try {
       if (event.data instanceof ArrayBuffer) {
-        // Handle binary message (Yjs update)
+        if (event.data.byteLength === 0) {
+          throw new Error("Received empty binary message");
+        }
         const update = new Uint8Array(event.data);
         await this.onPeerUpdate({ update });
       } else if (typeof event.data === "string") {
-        // Handle text message
+        if (!event.data) {
+          throw new Error("Received empty text message");
+        }
         const data = JSON.parse(event.data);
+        if (!data.type) {
+          throw new Error("Message type not specified");
+        }
         if (data.type === "authenticate") {
           await this.onAuthenticateRequest();
         } else if (data.type === "ready") {
           this.onReady();
+        } else {
+          console.warn(`Unhandled message type: ${data.type}`);
         }
       }
     } catch (error) {
-      console.error("Error handling message:", error);
+      const errorMessage = error instanceof Error ? error.message : "Unknown error";
+      console.error(`Error handling WebSocket message: ${errorMessage}`);
+      if (error instanceof SyntaxError) {
+        console.error("Failed to parse message:", event.data);
+      }
     }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
// Handle binary message (Yjs update)
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
// Handle text message
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
}
}
} catch (error) {
console.error("Error handling message:", error);
}
}
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
if (event.data.byteLength === 0) {
throw new Error("Received empty binary message");
}
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
if (!event.data) {
throw new Error("Received empty text message");
}
const data = JSON.parse(event.data);
if (!data.type) {
throw new Error("Message type not specified");
}
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
} else {
console.warn(`Unhandled message type: ${data.type}`);
}
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
console.error(`Error handling WebSocket message: ${errorMessage}`);
if (error instanceof SyntaxError) {
console.error("Failed to parse message:", event.data);
}
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)

14-19: Add input validation to createBinaryMessage function

The function should validate its inputs to prevent potential issues with invalid data.

 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+  if (!(type in MessageType)) {
+    throw new Error(`Invalid message type: ${type}`);
+  }
+  if (!(data instanceof Uint8Array)) {
+    throw new Error('Data must be a Uint8Array');
+  }
   const message = new Uint8Array(data.length + 1);
   message[0] = type;
   message.set(data, 1);
   return message;
 }

244-258: Add message queue for handling disconnections

Messages sent during disconnection are lost. Consider implementing a message queue to handle this scenario.

+  private messageQueue: FlowMessage[] = [];
+  private readonly MAX_QUEUE_SIZE = 100;
+
   private async sendFlowMessage(message: FlowMessage): Promise<void> {
     return new Promise((resolve, reject) => {
       if (this.ws.readyState !== WebSocket.OPEN) {
-        reject(new Error("WebSocket is not connected"));
-        return;
+        if (this.messageQueue.length < this.MAX_QUEUE_SIZE) {
+          this.messageQueue.push(message);
+          resolve();
+        } else {
+          reject(new Error("Message queue full, connection lost"));
+        }
+        return;
       }
+
+      // Try to send queued messages first
+      while (this.messageQueue.length > 0 && this.ws.readyState === WebSocket.OPEN) {
+        const queuedMessage = this.messageQueue.shift();
+        if (queuedMessage) {
+          this.ws.send(JSON.stringify(queuedMessage));
+        }
+      }

       try {
         this.ws.send(JSON.stringify(message));
         resolve();
       } catch (error) {
         reject(error);
       }
     });
   }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b636507 and 839ad71.

📒 Files selected for processing (2)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
  • ui/src/lib/yjs/types.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • ui/src/lib/yjs/types.ts

Comment on lines 216 to 225
protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
const update = data.update instanceof ArrayBuffer
? new Uint8Array(data.update)
: data.update;

const currentState = Y.encodeStateAsUpdateV2(this.doc);
const diffUpdate = Y.diffUpdateV2(update, currentState);
Y.applyUpdateV2(this.doc, diffUpdate, 'peer');
this.onUpdateHandlers.forEach((handler) => handler(update));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add sync state validation before applying updates

The onPeerUpdate method should validate the sync state before applying updates to prevent potential data corruption.

 protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
+  if (!this.socketReady || !this.firstSyncComplete) {
+    console.warn('Received peer update before sync completion');
+    return;
+  }
+
   const update = data.update instanceof ArrayBuffer
     ? new Uint8Array(data.update)
     : data.update;

   const currentState = Y.encodeStateAsUpdateV2(this.doc);
   const diffUpdate = Y.diffUpdateV2(update, currentState);
   Y.applyUpdateV2(this.doc, diffUpdate, 'peer');
   this.onUpdateHandlers.forEach((handler) => handler(update));
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +161 to +162
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit error handling for JSON parsing

The JSON parsing could fail with invalid data, and the error should be handled explicitly.

-        const data = JSON.parse(event.data);
-        if (data.type === "authenticate") {
+        let data;
+        try {
+          data = JSON.parse(event.data);
+          if (!data || typeof data !== 'object') {
+            throw new Error('Invalid message format');
+          }
+        } catch (error) {
+          console.error('Failed to parse message:', error);
+          return;
+        }
+        if (data.type === "authenticate") {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const data = JSON.parse(event.data);
if (data.type === "authenticate") {
let data;
try {
data = JSON.parse(event.data);
if (!data || typeof data !== 'object') {
throw new Error('Invalid message format');
}
} catch (error) {
console.error('Failed to parse message:', error);
return;
}
if (data.type === "authenticate") {

Comment on lines +98 to +111
this.reconnectTimer = setTimeout(async () => {
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add WebSocket cleanup before reconnection

The reconnection attempt should properly close and clean up the existing WebSocket connection before creating a new one.

 this.reconnectTimer = setTimeout(async () => {
   try {
     if (this.ws) {
+      if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
+        this.ws.close();
+      }
+      // Wait for the connection to fully close
+      await new Promise(resolve => setTimeout(resolve, 100));
+
       const originalUrl = new URL(this.ws.url);
       const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
       const roomId = originalUrl.pathname.slice(1);

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (3)
ui/src/lib/yjs/types.ts (3)

12-21: Consider strengthening type safety for the data field

The data field is typed as an optional string, but it might contain structured data for different event types. Consider using a generic type parameter or specific data structures based on the event tag to ensure type safety at compile time.

export type FlowMessage<T = unknown> = {
  event: {
    tag: "Create" | "Join" | "Leave" | "Emit";
    content: {
      room_id?: string;
      data?: T;  // Generic type parameter for better type safety
    };
  };
  session_command?: SessionCommand;
};

45-50: Add input validation for robustness

The function should validate inputs to prevent potential issues:

  1. Ensure data is not null/empty
  2. Validate that the message type value is within enum bounds
export function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
  if (!data?.length) {
    throw new Error("Data cannot be empty");
  }
  if (!Object.values(MessageType).includes(type)) {
    throw new Error("Invalid message type");
  }
  const message = new Uint8Array(data.length + 1);
  message[0] = type;
  message.set(data, 1);
  return message;
}

Line range hint 1-50: Consider error handling and sync strategy documentation

The types suggest a custom WebSocket integration with Yjs. To align with the PR objectives:

  1. Consider adding error-specific types for consistent error handling across the application
  2. Document the sync strategy (e.g., when SYNC vs UPDATE messages are used)
  3. Consider adding JSDoc comments explaining the integration with the Rust backend
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 839ad71 and 596f47c.

📒 Files selected for processing (2)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
  • ui/src/lib/yjs/types.ts (2 hunks)
🔇 Additional comments (2)
ui/src/lib/yjs/types.ts (1)

40-43: LGTM!

The MessageType enum is well-defined with clear numeric values for binary message type identification.

ui/src/lib/yjs/socketYjsManager.ts (1)

1-283: Verify backend compatibility with the WebSocket protocol

The implementation assumes specific backend behavior for room creation, joining, and synchronization. Let's verify the protocol compatibility.

✅ Verification successful

Based on the search results, I can now generate a final response as we have sufficient evidence about the backend implementation:

Backend WebSocket protocol is compatible with the client implementation

The backend implementation in the websocket directory confirms compatibility with the client's protocol:

  • Room management matches client expectations (Create, Join, Emit events)
  • Message types and flow handling are consistent
  • WebSocket handlers properly handle binary updates and JSON messages

Key matching components:

  • websocket/app/src/handlers/types.rs defines the same FlowMessage structure
  • websocket/app/src/handlers/room_handler.rs implements the room lifecycle events
  • Binary message handling for Yjs updates is supported through the WebSocket infrastructure
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check backend WebSocket protocol implementation

# Look for WebSocket protocol documentation
echo "Searching for WebSocket protocol documentation..."
rg -l "websocket|WebSocket" -g "*.md"

# Look for backend WebSocket handler implementations
echo "Searching for backend WebSocket handlers..."
rg -l "tag: \"(Create|Join|Emit)\"" --type rust

Length of output: 293


Script:

#!/bin/bash
# Let's try a broader search for WebSocket-related code and documentation

# Search for WebSocket-related files
echo "=== Searching for WebSocket-related files ==="
fd -t f -e rs -e toml . | xargs rg -l "websocket|WebSocket"

# Search for Flow message types and handlers
echo -e "\n=== Searching for Flow message handling ==="
rg "enum.*Flow.*|struct.*Flow.*" --type rust -A 5

# Search for room management code
echo -e "\n=== Searching for room management code ==="
rg "room_id|RoomId" --type rust -A 3

# Look for protocol types
echo -e "\n=== Searching for protocol types ==="
rg "type.*Message|enum.*Message" --type rust -A 5

Length of output: 15685

Comment on lines +23 to +38
export type SessionCommand = {
tag:
| "Start"
| "End"
| "Complete"
| "CheckStatus"
| "AddTask"
| "RemoveTask"
| "ListAllSnapshotsVersions"
| "MergeUpdates";
content: {
project_id?: string;
user?: User;
data?: Uint8Array;
};
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance type safety and data consistency

Several observations:

  1. Some fields might be required depending on the command tag
  2. The data field type (Uint8Array) differs from FlowMessage's data field (string)

Consider using discriminated unions and consistent data types:

export type SessionCommand = {
  tag: "CheckStatus" | "ListAllSnapshotsVersions";  // Commands without required fields
  content: {
    project_id?: string;
    user?: User;
    data?: Uint8Array;
  };
} | {
  tag: "Start" | "End" | "Complete" | "AddTask" | "RemoveTask" | "MergeUpdates";  // Commands requiring project and user
  content: {
    project_id: string;  // Required
    user: User;  // Required
    data?: Uint8Array;
  };
};

Comment on lines 161 to 192
protected async initializeRoom() {
try {
await this.sendFlowMessage({
event: {
tag: "Create",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Join",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "Start",
content: { project_id: this.projectId },
},
});

await this.syncData();
} catch (error) {
console.error("Failed to initialize room:", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add protection against race conditions in room initialization

The sequential flow messages could be affected by network delays or connection issues, potentially leading to race conditions.

 protected async initializeRoom() {
+  if (!this.socketReady) {
+    throw new Error('Socket not ready for room initialization');
+  }
+
   try {
+    const roomId = this.doc.clientID.toString();
+    
     await this.sendFlowMessage({
       event: {
         tag: "Create",
-        content: { room_id: this.doc.clientID.toString() },
+        content: { room_id: roomId },
       },
     });

     await this.sendFlowMessage({
       event: {
         tag: "Join",
-        content: { room_id: this.doc.clientID.toString() },
+        content: { room_id: roomId },
       },
     });

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +41 to +73
// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;

try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;

// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers

this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";

this.setupWebSocketListeners();
this.setupDocListeners();

console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation for setupSocket parameters

The method lacks validation for required parameters. URL construction should be more robust.

 async setupSocket(data: {
   url: string;
   roomId: string;
   projectId: string;
   accessTokenProvider: AccessTokenProvider;
 }) {
+  // Validate input parameters
+  if (!data.url || !data.roomId || !data.projectId) {
+    throw new Error('Required parameters missing');
+  }
+  
+  // Validate URL format
+  try {
+    new URL(data.url);
+  } catch (error) {
+    throw new Error('Invalid URL format');
+  }

   this.accessTokenProvider = data.accessTokenProvider;
   this.projectId = data.projectId;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
}
// Replace the setupSocket method
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
// Validate input parameters
if (!data.url || !data.roomId || !data.projectId) {
throw new Error('Required parameters missing');
}
// Validate URL format
try {
new URL(data.url);
} catch (error) {
throw new Error('Invalid URL format');
}
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;
try {
const token = await this.accessTokenProvider();
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
console.error("Failed to setup WebSocket:", error);
throw error;
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)

17-19: Consider making reconnection parameters configurable

The reconnection parameters are hardcoded, which limits flexibility for different network conditions or requirements.

Consider making these parameters configurable through the constructor:

-  protected reconnectAttempts = 0;
-  protected maxReconnectAttempts = 5;
-  protected reconnectDelay = 1000;
+  protected reconnectAttempts = 0;
+  constructor(
+    doc: Y.Doc,
+    protected maxReconnectAttempts = 5,
+    protected reconnectDelay = 1000
+  ) {
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 596f47c and 9055fa2.

📒 Files selected for processing (2)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
  • ui/src/lib/yjs/types.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • ui/src/lib/yjs/types.ts

Comment on lines 282 to 287
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation to createBinaryMessage

The function doesn't validate the MessageType enum value or the data array.

Add validation to prevent invalid message creation:

 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+  if (!Object.values(MessageType).includes(type)) {
+    throw new Error(`Invalid message type: ${type}`);
+  }
+  if (!data || !(data instanceof Uint8Array)) {
+    throw new Error('Invalid data: must be a Uint8Array');
+  }
   const message = new Uint8Array(data.length + 1);
   message[0] = type;
   message.set(data, 1);
   return message;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
if (!Object.values(MessageType).includes(type)) {
throw new Error(`Invalid message type: ${type}`);
}
if (!data || !(data instanceof Uint8Array)) {
throw new Error('Invalid data: must be a Uint8Array');
}
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}

Comment on lines +95 to +96
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid silent failures with empty projectId

Using empty strings as fallbacks for required parameters could lead to silent failures.

Consider throwing an error instead:

-              projectId: this.projectId || "",
-              accessTokenProvider: this.accessTokenProvider || (() => ""),
+              projectId: this.projectId ?? (() => {
+                throw new Error("Project ID is required for reconnection");
+              })(),
+              accessTokenProvider: this.accessTokenProvider ?? (() => {
+                throw new Error("Access token provider is required for reconnection");
+              }),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
projectId: this.projectId ?? (() => {
throw new Error("Project ID is required for reconnection");
})(),
accessTokenProvider: this.accessTokenProvider ?? (() => {
throw new Error("Access token provider is required for reconnection");
}),

Comment on lines 113 to 115
private setupDocListeners() {
this.doc.on("update", this.onDocUpdate);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for doc listeners

The document update listener is not removed when the instance is destroyed, which could lead to memory leaks.

Add cleanup in the destroy method:

 private setupDocListeners() {
   this.doc.on("update", this.onDocUpdate);
 }
+
+private cleanupDocListeners() {
+  this.doc.off("update", this.onDocUpdate);
+}

And in the destroy method:

 destroy() {
+  this.cleanupDocListeners();
   if (this.reconnectTimer) {

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 215 to 230
async syncData() {
await this.isReady();

const currentState = Y.encodeStateAsUpdateV2(this.doc);
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState);

if (this.ws.readyState === WebSocket.OPEN) {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
this.ws.send(syncMessage);
}

if (!this.firstSyncComplete) {
this.firstSyncComplete = true;
queueMicrotask(() => this.syncData());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent parallel sync operations

Multiple calls to syncData could run in parallel, potentially causing race conditions.

Add a lock mechanism to prevent parallel sync operations:

+  private syncInProgress = false;
+
   async syncData() {
+    if (this.syncInProgress) {
+      return;
+    }
+    this.syncInProgress = true;
+    try {
       await this.isReady();
 
       const currentState = Y.encodeStateAsUpdateV2(this.doc);
       const stateVector = Y.encodeStateVectorFromUpdateV2(currentState);
       
       if (this.ws.readyState === WebSocket.OPEN) {
         const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
         this.ws.send(syncMessage);
       }
 
       if (!this.firstSyncComplete) {
         this.firstSyncComplete = true;
         queueMicrotask(() => this.syncData());
       }
+    } finally {
+      this.syncInProgress = false;
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async syncData() {
await this.isReady();
const currentState = Y.encodeStateAsUpdateV2(this.doc);
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState);
if (this.ws.readyState === WebSocket.OPEN) {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
this.ws.send(syncMessage);
}
if (!this.firstSyncComplete) {
this.firstSyncComplete = true;
queueMicrotask(() => this.syncData());
}
}
private syncInProgress = false;
async syncData() {
if (this.syncInProgress) {
return;
}
this.syncInProgress = true;
try {
await this.isReady();
const currentState = Y.encodeStateAsUpdateV2(this.doc);
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState);
if (this.ws.readyState === WebSocket.OPEN) {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
this.ws.send(syncMessage);
}
if (!this.firstSyncComplete) {
this.firstSyncComplete = true;
queueMicrotask(() => this.syncData());
}
} finally {
this.syncInProgress = false;
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (3)
ui/src/lib/yjs/socketYjsManager.ts (3)

69-72: Enhance error handling in setupSocket

The catch block logs and re-throws the error without providing context about which part of the setup failed.

Consider this improvement:

-    } catch (error) {
-      console.error("Failed to setup WebSocket:", error);
-      throw error;
+    } catch (error) {
+      const phase = !this.accessTokenProvider ? "token provider"
+        : !this.projectId ? "project ID"
+        : "WebSocket connection";
+      console.error(`WebSocket setup failed during ${phase}:`, error);
+      throw new Error(`Failed to setup WebSocket (${phase}): ${error instanceof Error ? error.message : String(error)}`);
     }

129-132: Improve error handling in onConnectionError

The error handler provides minimal context and immediately triggers reconnection.

Consider this improvement:

-  protected onConnectionError(error: Event) {
-    console.error("WebSocket error:", error);
-    this.reconnect();
+  protected onConnectionError(error: Event) {
+    const wsState = this.ws?.readyState;
+    const stateMap = {
+      [WebSocket.CONNECTING]: 'CONNECTING',
+      [WebSocket.OPEN]: 'OPEN',
+      [WebSocket.CLOSING]: 'CLOSING',
+      [WebSocket.CLOSED]: 'CLOSED',
+    };
+    console.error(`WebSocket error (state: ${stateMap[wsState ?? -1] ?? 'UNKNOWN'}):`, error);
+    if (wsState !== WebSocket.CLOSING && wsState !== WebSocket.CLOSED) {
+      this.reconnect();
+    }
   }

377-392: Consider using debug level logging

The extensive logging in createBinaryMessage should be behind a debug flag to avoid performance impact in production.

Consider this improvement:

+const DEBUG = process.env.NODE_ENV !== 'production';
+
 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
-    console.log("Creating binary message:", {
+    DEBUG && console.log("Creating binary message:", {
         type,
         dataLength: data.length,
         rawData: Array.from(data)
     });
     
     const message = new Uint8Array(data.length + 1);
     message[0] = type;
     message.set(data, 1);
     
-    console.log("Created message:", {
+    DEBUG && console.log("Created message:", {
         messageLength: message.length,
         messageType: message[0],
         rawMessage: Array.from(message)
     });
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between c08e26d and 53ea438.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)

this.onUpdateHandlers.push(handler);
}

protected async destroy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Change destroy method visibility to public

The destroy method is marked as protected but should be public as it's needed for cleanup when the manager is no longer needed.

-  protected async destroy() {
+  public async destroy() {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected async destroy() {
public async destroy() {

Comment on lines +9 to +21
export class SocketYjsManager {
protected ws!: WebSocket;
protected doc: Y.Doc;
protected socketReady = false;
protected firstSyncComplete = false;
protected accessTokenProvider: AccessTokenProvider | undefined;
protected projectId: string | undefined;
protected onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
protected reconnectAttempts = 0;
protected maxReconnectAttempts = 5;
protected reconnectDelay = 1000;
protected reconnectTimer: ReturnType<typeof setTimeout> | null = null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider revising field visibility and optional types

The class has several issues in its field declarations:

  1. Protected visibility seems unnecessary for internal implementation
  2. Optional fields (accessTokenProvider and projectId) are used throughout the code without proper null checks

Consider this improvement:

-export class SocketYjsManager {
-  protected ws!: WebSocket;
-  protected doc: Y.Doc;
-  protected socketReady = false;
-  protected firstSyncComplete = false;
-  protected accessTokenProvider: AccessTokenProvider | undefined;
-  protected projectId: string | undefined;
-  protected onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
+export class SocketYjsManager {
+  private ws!: WebSocket;
+  private doc: Y.Doc;
+  private socketReady = false;
+  private firstSyncComplete = false;
+  private accessTokenProvider!: AccessTokenProvider;
+  private projectId!: string;
+  private onUpdateHandlers: ((update: Uint8Array) => void)[] = [];

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)

17-19: Consider making reconnection parameters configurable

The reconnection parameters (attempts and delay) are hardcoded. Consider making these configurable through constructor options to allow different retry strategies for different environments.

-  protected reconnectAttempts = 0;
-  protected maxReconnectAttempts = 5;
-  protected reconnectDelay = 1000;
+  protected reconnectAttempts = 0;
+  protected maxReconnectAttempts: number;
+  protected reconnectDelay: number;
+
+  constructor(doc: Y.Doc, options: {
+    maxReconnectAttempts?: number;
+    reconnectDelay?: number;
+  } = {}) {
+    this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
+    this.reconnectDelay = options.reconnectDelay ?? 1000;

230-272: Add logging control mechanism

The extensive debug logging could impact performance and pollute logs in production.

+  protected debugLogging = false;
+
+  private log(...args: unknown[]) {
+    if (this.debugLogging) {
+      console.log(...args);
+    }
+  }
+
   // Then replace console.log calls with this.log
-    console.log("Received peer update:", {
+    this.log("Received peer update:", {
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 53ea438 and 06f2268.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)

Comment on lines +143 to +147
if (event.data instanceof ArrayBuffer) {
// Handle binary message (Yjs update)
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add binary message validation

The binary message handling lacks validation of the message format and size.

       if (event.data instanceof ArrayBuffer) {
+        if (event.data.byteLength < 1) {
+          throw new Error('Invalid binary message: empty message');
+        }
+        if (event.data.byteLength > 1024 * 1024) { // 1MB limit
+          throw new Error('Invalid binary message: message too large');
+        }
         const update = new Uint8Array(event.data);
+        const messageType = update[0];
+        if (!Object.values(MessageType).includes(messageType)) {
+          throw new Error(`Invalid binary message: unknown message type ${messageType}`);
+        }
         await this.onPeerUpdate({ update });

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +323 to +325
onUpdate(handler: (update: Uint8Array) => void) {
this.onUpdateHandlers.push(handler);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for update handlers

The update handler execution doesn't handle errors, which could prevent subsequent handlers from running.

   onUpdate(handler: (update: Uint8Array) => void) {
-    this.onUpdateHandlers.push(handler);
+    const wrappedHandler = (update: Uint8Array) => {
+      try {
+        handler(update);
+      } catch (error) {
+        console.error('Error in update handler:', error);
+      }
+    };
+    this.onUpdateHandlers.push(wrappedHandler);
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
onUpdate(handler: (update: Uint8Array) => void) {
this.onUpdateHandlers.push(handler);
}
onUpdate(handler: (update: Uint8Array) => void) {
const wrappedHandler = (update: Uint8Array) => {
try {
handler(update);
} catch (error) {
console.error('Error in update handler:', error);
}
};
this.onUpdateHandlers.push(wrappedHandler);
}

Comment on lines +76 to +104
private reconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}

if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay =
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

this.reconnectTimer = setTimeout(async () => {
try {
if (this.ws) {
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
await this.setupSocket({
url: baseUrl,
roomId,
projectId: this.projectId || "",
accessTokenProvider: this.accessTokenProvider || (() => ""),
});
}
} catch (error) {
console.error("Reconnection failed:", error);
}
}, delay);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection state tracking

The reconnection logic could benefit from explicit state tracking to prevent race conditions and multiple simultaneous reconnection attempts.

+  private connectionState: 'disconnected' | 'connecting' | 'connected' = 'disconnected';
+
   private reconnect() {
+    if (this.connectionState === 'connecting') {
+      return; // Prevent multiple simultaneous reconnection attempts
+    }
+
     if (this.reconnectTimer) {
       clearTimeout(this.reconnectTimer);
     }
+    this.connectionState = 'connecting';

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +62 to +64
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add connection timeout handling

The WebSocket connection attempt lacks a timeout, which could leave the application hanging if the server is unresponsive.

+      const connectionTimeout = setTimeout(() => {
+        if (this.ws.readyState === WebSocket.CONNECTING) {
+          this.ws.close();
+          throw new Error('WebSocket connection timeout');
+        }
+      }, 10000); // 10 seconds timeout
+
       this.ws = new WebSocket(wsUrl.href);
       this.ws.binaryType = "arraybuffer";
+
+      this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
const connectionTimeout = setTimeout(() => {
if (this.ws.readyState === WebSocket.CONNECTING) {
this.ws.close();
throw new Error('WebSocket connection timeout');
}
}, 10000); // 10 seconds timeout
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });

Comment on lines +161 to +213
protected async initializeRoom() {
try {
await this.sendFlowMessage({
event: {
tag: "Create",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Join",
content: { room_id: this.doc.clientID.toString() },
},
});

await this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "Start",
content: {
project_id: this.projectId,
user: {
id: this.doc.clientID.toString(),
name: "",
email: "",
tenant_id: "123"
}
},
},
});

await this.sendFlowMessage({
event: {
tag: "Emit",
content: { data: "" },
},
session_command: {
tag: "AddTask",
content: {
project_id: this.projectId,
},
},
});

await this.syncData();
} catch (error) {
console.error("Failed to initialize room:", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add timeout for room initialization

The room initialization process could hang indefinitely if any of the steps fail silently.

   protected async initializeRoom() {
+    const initTimeout = setTimeout(() => {
+      throw new Error('Room initialization timeout after 30 seconds');
+    }, 30000);
+
     try {
       await this.sendFlowMessage({
         // ... existing code ...
       });
+    } finally {
+      clearTimeout(initTimeout);
     }
   }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)

17-19: Make reconnection parameters configurable

The reconnection parameters are hardcoded. Consider making them configurable through constructor options to support different use cases and environments.

Apply this diff:

+interface SocketYjsManagerOptions {
+  maxReconnectAttempts?: number;
+  reconnectDelay?: number;
+}

 export class SocketYjsManager {
-  protected reconnectAttempts = 0;
-  protected maxReconnectAttempts = 5;
-  protected reconnectDelay = 1000;
+  private reconnectAttempts = 0;
+  private maxReconnectAttempts: number;
+  private reconnectDelay: number;

-  constructor(doc: Y.Doc) {
+  constructor(doc: Y.Doc, options: SocketYjsManagerOptions = {}) {
     this.doc = doc;
+    this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
+    this.reconnectDelay = options.reconnectDelay ?? 1000;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 06f2268 and d452e3e.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
🔇 Additional comments (5)
ui/src/lib/yjs/socketYjsManager.ts (5)

376-381: 🛠️ Refactor suggestion

Add input validation to createBinaryMessage

The binary message creation lacks validation for the input parameters.

Apply this diff:

 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+    if (!Object.values(MessageType).includes(type)) {
+      throw new Error(`Invalid message type: ${type}`);
+    }
+    if (!(data instanceof Uint8Array)) {
+      throw new Error('Data must be a Uint8Array');
+    }
     const message = new Uint8Array(data.length + 1);
     message[0] = type;
     message.set(data, 1);
     return message;
 }

Likely invalid or redundant comment.


185-191: ⚠️ Potential issue

Remove hardcoded user information

The user information contains hardcoded empty values and a static tenant ID, which could cause issues in production.

Add proper user information handling:

+interface UserInfo {
+  name: string;
+  email: string;
+  tenant_id: string;
+}

 export class SocketYjsManager {
+  private userInfo: UserInfo;

-  constructor(doc: Y.Doc) {
+  constructor(doc: Y.Doc, userInfo: UserInfo) {
     this.doc = doc;
+    this.userInfo = userInfo;

     // ... rest of constructor
   }

   // In initializeRoom:
-            user: {
-              id: this.doc.clientID.toString(),
-              name: "",
-              email: "",
-              tenant_id: "123"
-            }
+            user: {
+              id: this.doc.clientID.toString(),
+              ...this.userInfo
+            }

Likely invalid or redundant comment.


14-15: ⚠️ Potential issue

Make required fields non-optional

The accessTokenProvider and projectId fields are marked as optional but are required for the WebSocket connection to work. This could lead to runtime errors.

Apply this diff:

-  protected accessTokenProvider: AccessTokenProvider | undefined;
-  protected projectId: string | undefined;
+  private accessTokenProvider!: AccessTokenProvider;
+  private projectId!: string;

Likely invalid or redundant comment.


215-219: ⚠️ Potential issue

Replace recursive isReady with async polling

The recursive implementation of isReady could cause stack overflow for long waiting periods.

Apply this diff:

   async isReady(): Promise<boolean> {
-    if (this.socketReady) return true;
-    await sleep(100);
-    return await this.isReady();
+    const maxAttempts = 100; // 10 seconds total
+    let attempts = 0;
+    while (!this.socketReady && attempts < maxAttempts) {
+      await sleep(100);
+      attempts++;
+    }
+    if (!this.socketReady) {
+      throw new Error('Connection timeout waiting for ready state');
+    }
+    return true;
   }

Likely invalid or redundant comment.


58-60: ⚠️ Potential issue

Security: Move sensitive data from URL parameters to headers

Passing sensitive information like user_id, project_id, and token in URL parameters exposes them to logs and potential security risks.

Consider using a more secure approach:

-      wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
-      wsUrl.searchParams.set("project_id", data.projectId);
-      wsUrl.searchParams.set("token", token);
+      const headers = {
+        'X-User-ID': this.doc.clientID.toString(),
+        'X-Project-ID': data.projectId,
+        'Authorization': `Bearer ${token}`
+      };
+      
+      // Note: Since WebSocket API doesn't support custom headers directly,
+      // consider implementing a handshake endpoint that returns a secure
+      // WebSocket URL after validating the credentials

Likely invalid or redundant comment.

Comment on lines +230 to +233
console.log("Received peer update:", {
updateLength: update.length,
rawUpdate: Array.from(update)
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add conditional debug logging

The code contains extensive debug logging that should be conditional to avoid performance impact in production.

Add a debug mode flag and wrap all debug logs:

+  private debugMode = false;
+
+  private debugLog(message: string, data?: unknown) {
+    if (this.debugMode) {
+      console.log(message, data);
+    }
+  }

   protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
-    console.log("Received peer update:", {
+    this.debugLog("Received peer update:", {

Also applies to: 236-239, 242-245, 255-258, 261-264, 268-272, 291-296, 308-321

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)

17-20: Make reconnection parameters configurable

The reconnection parameters are hardcoded. Consider making them configurable through the constructor to allow different retry strategies for different environments.

-  protected reconnectAttempts = 0;
-  protected maxReconnectAttempts = 5;
-  protected reconnectDelay = 1000;
+  protected reconnectAttempts = 0;
+  constructor(
+    doc: Y.Doc,
+    private config: {
+      maxReconnectAttempts?: number;
+      reconnectDelay?: number;
+    } = {}
+  ) {
+    this.maxReconnectAttempts = config.maxReconnectAttempts ?? 5;
+    this.reconnectDelay = config.reconnectDelay ?? 1000;

25-34: Use class fields with arrow functions to avoid manual binding

The manual method binding in the constructor can be eliminated by using class fields with arrow functions, making the code more maintainable and less prone to binding errors.

-    // Bind methods
-    this.onConnectionEstablished = this.onConnectionEstablished.bind(this);
-    this.onConnectionDisconnect = this.onConnectionDisconnect.bind(this);
-    this.onConnectionError = this.onConnectionError.bind(this);
-    this.onAuthenticateRequest = this.onAuthenticateRequest.bind(this);
-    this.onDocUpdate = this.onDocUpdate.bind(this);
-    this.onReady = this.onReady.bind(this);
-    this.onPeerUpdate = this.onPeerUpdate.bind(this);
-    this.handleMessage = this.handleMessage.bind(this);
-    this.reconnect = this.reconnect.bind(this);
+    // Methods will be automatically bound when defined as class fields
+    protected onConnectionEstablished = () => {
+      this.reconnectAttempts = 0;
+      this.socketReady = true;
+      this.initializeRoom().catch(console.error);
+    };
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between d452e3e and 247877e.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
🔇 Additional comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)

369-374: 🛠️ Refactor suggestion

Add size validation for binary messages

The binary message creation should validate the input data size to prevent memory issues with large updates.

 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+    const MAX_MESSAGE_SIZE = 1024 * 1024; // 1MB limit
+    if (data.length > MAX_MESSAGE_SIZE) {
+      throw new Error(`Message size exceeds limit: ${data.length} bytes`);
+    }
     const message = new Uint8Array(data.length + 1);
     message[0] = type;
     message.set(data, 1);
     return message;
 }

Likely invalid or redundant comment.

Comment on lines +53 to +55
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add URL protocol validation

The URL protocol replacement assumes HTTP/HTTPS protocols. Add validation to ensure only supported protocols are processed.

       const wsUrl = new URL(data.url);
+      if (!wsUrl.protocol.startsWith('http')) {
+        throw new Error('URL protocol must be HTTP or HTTPS');
+      }
       wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
       wsUrl.pathname = `/${data.roomId}`;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
const wsUrl = new URL(data.url);
if (!wsUrl.protocol.startsWith('http')) {
throw new Error('URL protocol must be HTTP or HTTPS');
}
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;

Comment on lines +129 to +132
protected onConnectionError(error: Event) {
console.error("WebSocket error:", error);
this.reconnect();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error event typing

The error event parameter should use the specific ErrorEvent type instead of the generic Event type.

-  protected onConnectionError(error: Event) {
+  protected onConnectionError(error: ErrorEvent) {
     console.error("WebSocket error:", error);
+    console.error("Error details:", error.error);
     this.reconnect();
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected onConnectionError(error: Event) {
console.error("WebSocket error:", error);
this.reconnect();
}
protected onConnectionError(error: ErrorEvent) {
console.error("WebSocket error:", error);
console.error("Error details:", error.error);
this.reconnect();
}

Comment on lines +259 to +267
if (this.ws.readyState === WebSocket.OPEN) {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
console.log("Sending sync message:", {
messageLength: syncMessage.length,
messageType: syncMessage[0],
rawMessage: Array.from(syncMessage)
});
this.ws.send(syncMessage);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle WebSocket state changes during sync

The WebSocket state should be checked both before and after preparing the sync message to handle potential disconnections.

-    if (this.ws.readyState === WebSocket.OPEN) {
+    const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
+    if (this.ws.readyState !== WebSocket.OPEN) {
+      throw new Error('WebSocket disconnected while preparing sync message');
+    }
+    try {
       const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
       console.log("Sending sync message:", {
           messageLength: syncMessage.length,
           messageType: syncMessage[0],
           rawMessage: Array.from(syncMessage)
       });
       this.ws.send(syncMessage);
+    } catch (error) {
+      this.socketReady = false;
+      throw error;
     }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 247877e and 3940df9.

📒 Files selected for processing (1)
  • ui/src/lib/yjs/socketYjsManager.ts (1 hunks)
🔇 Additional comments (6)
ui/src/lib/yjs/socketYjsManager.ts (6)

325-325: ⚠️ Potential issue

Change destroy method visibility to public.

The destroy method should be public as it's needed for cleanup when the manager is no longer needed.

-  protected async destroy() {
+  public async destroy() {

Likely invalid or redundant comment.


83-84: 🛠️ Refactor suggestion

Add maximum delay cap to exponential backoff.

The exponential backoff delay grows unbounded, which could lead to very long reconnection delays.

Add a maximum delay cap:

+      const maxDelay = 30000; // 30 seconds maximum delay
       const delay = Math.min(
         this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
+        maxDelay
       );

Likely invalid or redundant comment.


215-219: ⚠️ Potential issue

Refactor 'isReady' method to eliminate recursion.

The recursive implementation of isReady can lead to a stack overflow for prolonged waiting periods.

Refactor to use a loop:

  async isReady(): Promise<boolean> {
-   if (this.socketReady) return true;
-   await sleep(100);
-   return await this.isReady();
+   while (!this.socketReady) {
+     await sleep(100);
+   }
+   return true;
  }

Likely invalid or redundant comment.


370-375: 🛠️ Refactor suggestion

Add input validation to createBinaryMessage.

The function doesn't validate the MessageType enum value or the data array.

Add validation:

 function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+  if (!Object.values(MessageType).includes(type)) {
+    throw new Error(`Invalid message type: ${type}`);
+  }
+  if (!data || !(data instanceof Uint8Array)) {
+    throw new Error('Invalid data: must be a Uint8Array');
+  }
   const message = new Uint8Array(data.length + 1);
   message[0] = type;
   message.set(data, 1);
   return message;
 }

Likely invalid or redundant comment.


143-147: 🛠️ Refactor suggestion

Add binary message validation.

The binary message handling lacks validation of the message format and size.

Add proper validation:

       if (event.data instanceof ArrayBuffer) {
+        if (event.data.byteLength < 1) {
+          throw new Error('Invalid binary message: empty message');
+        }
+        if (event.data.byteLength > 1024 * 1024) { // 1MB limit
+          throw new Error('Invalid binary message: message too large');
+        }
         const update = new Uint8Array(event.data);

Likely invalid or redundant comment.


62-64: 🛠️ Refactor suggestion

Add connection timeout handling.

The WebSocket connection attempt lacks a timeout, which could leave the application hanging if the server is unresponsive.

Add a timeout mechanism:

+      const connectionTimeout = setTimeout(() => {
+        if (this.ws.readyState === WebSocket.CONNECTING) {
+          this.ws.close();
+          throw new Error('WebSocket connection timeout');
+        }
+      }, 10000); // 10 seconds timeout
+
       this.ws = new WebSocket(wsUrl.href);
       this.ws.binaryType = "arraybuffer";
+
+      this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });

Likely invalid or redundant comment.

Comment on lines +58 to +60
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Security: Move sensitive data from URL parameters to headers.

Exposing sensitive information like tokens and IDs in URL parameters is a security risk as they can be logged or exposed in various ways.

Consider using a secure WebSocket handshake protocol or moving authentication to a dedicated message after connection.

Comment on lines +185 to +191
project_id: this.projectId,
user: {
id: this.doc.clientID.toString(),
name: "",
email: "",
tenant_id: "123"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove hardcoded user information.

Using hardcoded user information is a security risk and could lead to confusion in production.

Add proper user information handling:

-            user: {
-              id: this.doc.clientID.toString(),
-              name: "",
-              email: "",
-              tenant_id: "123"
-            }
+            user: this.userInfo

Add user information to the constructor:

constructor(
  doc: Y.Doc,
  private userInfo: {
    id: string;
    tenant_id?: string;
    name: string;
    email: string;
  }
) {
  this.doc = doc;
  // ... rest of the constructor
}

Comment on lines +14 to +15
protected accessTokenProvider: AccessTokenProvider | undefined;
protected projectId: string | undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make required fields non-optional or add proper null checks.

The accessTokenProvider and projectId fields are marked as optional but are used throughout the code without proper null checks. This could lead to runtime errors.

Apply this diff:

-  protected accessTokenProvider: AccessTokenProvider | undefined;
-  protected projectId: string | undefined;
+  private accessTokenProvider!: AccessTokenProvider;
+  private projectId!: string;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected accessTokenProvider: AccessTokenProvider | undefined;
protected projectId: string | undefined;
private accessTokenProvider!: AccessTokenProvider;
private projectId!: string;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants