-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ui): socket yjs manager & refactor #649
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes in this pull request involve updates to several localization files for multiple languages, including English, Spanish, French, Japanese, and Chinese. New keys have been added to these files to support localization for various user interface messages, particularly focusing on connection errors and other UI elements. Additionally, a new utility function for handling delays has been introduced, along with significant enhancements to the WebSocket management for real-time collaboration, including a new class and updated types. Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
❌ Deploy Preview for reearth-flow failed.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🧹 Outside diff range and nitpick comments (10)
ui/src/lib/utils.ts (1)
9-11
: LGTM! Consider adding input validation and documentation.The sleep function implementation is clean and follows standard practices. However, consider these improvements:
+/** + * Delays execution for the specified number of milliseconds. + * @param ms The number of milliseconds to sleep + * @throws {Error} If ms is negative + */ export function sleep(ms: number) { + if (ms < 0) throw new Error("Sleep duration cannot be negative"); return new Promise((resolve) => setTimeout(resolve, ms)); }ui/src/lib/yjs/types.ts (1)
23-39
: Add JSDoc comments to document command purposes and required fieldsThe SessionCommand type includes various commands that might have different requirements and purposes. Consider adding documentation to clarify their usage.
Example documentation:
/** * Commands for managing collaborative sessions. * @property tag - The type of command to execute * @property content - Command-specific data */ export type SessionCommand = { tag: | "Start" // Initiates a new session | "End" // Terminates an active session | "Complete" // Marks a session as completed // ... document other commandsConsider strengthening type safety for specific commands
Different commands might require different fields in their content. Consider using discriminated unions to enforce these requirements.
Example refactor:
type BaseContent = { project_id: string; }; type StartContent = BaseContent & { user: User; // Required for session start }; export type SessionCommand = { tag: "Start"; content: StartContent; } | { tag: "MergeUpdates"; content: BaseContent & { data: Uint8Array; // Required for updates updated_by: string; // Required for tracking }; } | { tag: "End" | "Complete" | "CheckStatus" | "AddTask" | "RemoveTask" | "ListAllSnapshotsVersions"; content: BaseContent & { user?: User; data?: Uint8Array; updated_by?: string; }; };ui/src/lib/i18n/locales/zh.json (2)
Line range hint
6-6
: Add missing Chinese translations for UI elements.The following keys have empty translations which will result in English text being shown to Chinese users:
- "Schema not found"
- "Submit"
- "3D visualization is not supported in this browser"
- "2D"
- "3D"
- "Manage Projects"
- "Connection Error"
Please provide appropriate Chinese translations for these UI elements to ensure a consistent user experience.
Also applies to: 7-7, 11-11, 12-12, 13-13, 81-81, 228-228
Line range hint
124-124
: Review the "Trigger1" key naming.The key "Trigger1" appears to be a development placeholder. Consider using a more descriptive name that reflects its purpose (e.g., "TriggerManual", "TriggerScheduled", etc.).
ui/src/lib/i18n/locales/es.json (1)
Line range hint
11-11
: Add missing Spanish translations for UI elementsSeveral new UI strings have been added without Spanish translations. This could impact the user experience for Spanish-speaking users.
Here are suggested translations for review:
- "3D visualization is not supported in this browser": "", + "3D visualization is not supported in this browser": "La visualización 3D no es compatible con este navegador", - "2D": "", + "2D": "2D", - "3D": "", + "3D": "3D", - "Choose action": "", + "Choose action": "Elegir acción", - "Subworkflow Node": "", + "Subworkflow Node": "Nodo de subflujo de trabajo", - "Connection Error": "", + "Connection Error": "Error de conexión"Also applies to: 12-13, 21-21, 39-39, 76-76, 228-228
ui/src/lib/i18n/locales/fr.json (4)
Line range hint
6-6
: Add missing French translations for new keysThe following keys are missing their French translations:
- "Schema not found" → Suggest: "Schéma non trouvé"
- "3D visualization is not supported in this browser" → Suggest: "La visualisation 3D n'est pas prise en charge dans ce navigateur"
- "Subworkflow Node" → Suggest: "Nœud de sous-flux"
- "Manage Projects" → Suggest: "Gérer les projets"
- "Trigger1" → Suggest: "Déclencheur1"
- "Connection Error" → Suggest: "Erreur de connexion"
Apply this diff to add the missing translations:
- "Schema not found": "", + "Schema not found": "Schéma non trouvé", - "3D visualization is not supported in this browser": "", + "3D visualization is not supported in this browser": "La visualisation 3D n'est pas prise en charge dans ce navigateur", - "Subworkflow Node": "", + "Subworkflow Node": "Nœud de sous-flux", - "Manage Projects": "", + "Manage Projects": "Gérer les projets", - "Trigger1": "", + "Trigger1": "Déclencheur1", - "Connection Error": "", + "Connection Error": "Erreur de connexion",Also applies to: 11-11, 19-19, 73-73, 147-147, 228-228
Line range hint
12-13
: Maintain consistent translation style for dimension labelsThe "2D" and "3D" keys are left empty, but since they are standard technical terms, they should remain as is in French.
Apply this diff:
- "2D": "", - "3D": "", + "2D": "2D", + "3D": "3D",
Line range hint
52-52
: Add missing translation for "General shortcuts"The "General shortcuts" key is missing its French translation while surrounding keys are properly translated.
Apply this diff:
- "General shortcuts": "", + "General shortcuts": "Raccourcis généraux",
Missing translations detected across language files
Several keys are missing translations in multiple language files:
- Only Japanese (ja.json) has translations for most of the new keys
- Spanish (es.json) only has "Schema not found" translated
- English (en.json), French (fr.json), and Chinese (zh.json) are missing translations for all the checked keys:
- "Schema not found"
- "3D visualization is not supported in this browser"
- "Subworkflow Node"
- "Manage Projects"
- "Trigger1"
- "Connection Error"
🔗 Analysis chain
Line range hint
1-232
: Verify translations across all language filesThe changes to this French localization file are part of a broader update across multiple language files. Ensure that all new keys are consistently added and translated across all supported languages.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for consistency of new keys across all language files # Find all locale files echo "Checking locale files for consistency..." for locale in ui/src/lib/i18n/locales/*.json; do echo "=== $(basename "$locale") ===" # Check for empty translations of the new keys jq -r ' .["Schema not found"], .["3D visualization is not supported in this browser"], .["Subworkflow Node"], .["Manage Projects"], .["Trigger1"], .["Connection Error"] ' "$locale" doneLength of output: 2020
ui/src/lib/yjs/socketYjsManager.ts (1)
274-303
: Ensure proper cleanup of event listeners in 'destroy' methodTo prevent potential memory leaks and ensure proper resource management, consider removing event listeners from both the WebSocket and the Yjs document when destroying the manager.
Apply this diff to add event listener removal:
destroy() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); } + // Remove WebSocket event listeners + if (this.ws) { + this.ws.removeEventListener("open", this.onConnectionEstablished); + this.ws.removeEventListener("close", this.onConnectionDisconnect); + this.ws.removeEventListener("error", this.onConnectionError); + this.ws.removeEventListener("message", this.handleMessage); + } + // Remove Y.Doc event listeners + this.doc.off("update", this.onDocUpdate); if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.sendFlowMessage({ event: { tag: "Emit", content: { data: "" }, }, session_command: { tag: "End", content: { project_id: this.projectId || "", user: { id: this.doc.clientID.toString(), tenant_id: this.projectId, name: "defaultName", email: "[email protected]", }, }, }, }).finally(() => { this.ws.close(); }); } this.doc.destroy(); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (10)
ui/src/lib/i18n/locales/en.json
(1 hunks)ui/src/lib/i18n/locales/es.json
(1 hunks)ui/src/lib/i18n/locales/fr.json
(1 hunks)ui/src/lib/i18n/locales/ja.json
(1 hunks)ui/src/lib/i18n/locales/zh.json
(1 hunks)ui/src/lib/utils.ts
(1 hunks)ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)ui/src/lib/yjs/types.ts
(2 hunks)ui/src/lib/yjs/useYjsStore.ts
(5 hunks)ui/src/types/user.ts
(1 hunks)
🔇 Additional comments (5)
ui/src/lib/utils.ts (1)
9-11
: Verify usage context in socket manager.
Since this is part of the socket Yjs manager changes, let's verify how this utility is being used to ensure it's not creating potential performance issues.
✅ Verification successful
Let me gather more context about the sleep usage in the socket manager to understand its purpose better.
Sleep usage in socket manager is appropriate and safe
The sleep(100)
usage in the isReady()
method is a standard polling mechanism to check socket readiness with a reasonable delay. It's used in conjunction with recursion to wait for the socket connection to be established, which is a common pattern in WebSocket implementations. The delay is small (100ms) and the recursion ends as soon as socketReady
becomes true, which happens in the onReady()
method when the socket is properly initialized.
Key observations:
- The sleep is only used during the initial connection phase
- It's properly wrapped in async/await
- The recursion has a clear exit condition (
socketReady = true
) - The delay is reasonable for a WebSocket handshake
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for sleep function usage in socket-related files
rg -l "sleep\(" | rg -i "socket|yjs" | xargs rg -A 5 "sleep\("
# Look for any retry mechanisms or timing-sensitive operations
rg -A 10 "new SocketYjsManager|extends SocketYjsManager"
Length of output: 2404
Script:
#!/bin/bash
# Get the implementation context of isReady method where sleep is used
ast-grep --pattern 'isReady() {
$$$
}'
# Get the socket manager class implementation to understand the overall flow
ast-grep --pattern 'class SocketYjsManager {
$$$
}'
Length of output: 19947
ui/src/types/user.ts (1)
18-18
: LGTM! The type definition is correct.
The optional tenant_id
property is properly typed and maintains backward compatibility with existing code.
ui/src/lib/yjs/types.ts (2)
Line range hint 1-39
: Verify integration with SocketYjsManager
These types appear to be used by the new SocketYjsManager. Let's verify the integration points to ensure proper usage.
✅ Verification successful
Types are properly integrated with SocketYjsManager and websocket backend
The verification shows that:
FlowMessage
andSessionCommand
types are correctly imported and used insocketYjsManager.ts
- The types are properly integrated with the Rust backend in
websocket/app/src/handler.rs
and related services - All command types defined in the TypeScript interface match their Rust counterparts
- The types are used consistently across the WebSocket communication layer for managing project edit sessions
The integration points are working as expected with no issues found.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check how these types are used in the socket manager
# Expected: Find usage of FlowMessage and SessionCommand in the manager
# Find SocketYjsManager implementation
ast-grep --pattern 'class SocketYjsManager {
$$$
}'
# Find usage of the types
rg -t typescript "FlowMessage|SessionCommand" -A 5
Length of output: 19992
Script:
#!/bin/bash
# Let's try searching with ripgrep without file type restriction
# and check the usage of FlowMessage and SessionCommand
# Search for FlowMessage usage
rg "FlowMessage" -A 2
# Search for SessionCommand usage
rg "SessionCommand" -A 2
# Search for imports of these types
rg "import.*(?:FlowMessage|SessionCommand)"
Length of output: 11072
1-1
: Verify the User type import usage
The User type is imported but only used optionally in SessionCommand. Let's verify if all required User properties are being utilized.
✅ Verification successful
The User type import is correctly used and required
The User type is appropriately imported and used in the SessionCommand type definition where it's needed as an optional property in the content object. The type definition from @flow/types/user
matches the usage requirements, containing id, name, email, and optional tenant_id properties which are necessary for user identification in session commands.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check the User type definition and its usage
# Expected: Find the User type definition to understand required properties
# Find User type definition
ast-grep --pattern 'type User = {
$$$
}'
# Find all usage of User type in the codebase
rg -t typescript "User" -A 5
Length of output: 345
Script:
#!/bin/bash
# Let's try with proper file extensions and check both the User type definition and its usage
# First, find all TypeScript files that mention User
rg "User" --type ts -A 5
# Then specifically look at our file of interest to see how User is used
cat ui/src/lib/yjs/types.ts
Length of output: 59778
ui/src/lib/yjs/useYjsStore.ts (1)
119-126
: Ensure managerRef
is initialized before using it
There is a possibility that managerRef.current
might be null
when undoTrackerActionWrapper
is called, which would prevent the transaction from occurring. Verify that managerRef.current
is properly initialized before any calls to undoTrackerActionWrapper
.
@@ -15,6 +15,7 @@ export type User = { | |||
id: string; | |||
name: string; | |||
email: string; | |||
tenant_id?: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
API contract mismatch detected between frontend type and GraphQL schema
The frontend User type in ui/src/types/user.ts
includes a tenant_id
field, but this field is not present in the GraphQL schema's User type (as seen in the generated types). This mismatch could lead to runtime issues since:
- The GraphQL User type only includes: id, name, email, and host fields
- None of the GraphQL queries/mutations handle the tenant_id field
- The generated types don't include tenant_id in their definitions
🔗 Analysis chain
Verify API contract and data handling.
Since this type likely corresponds to API responses and requests, please ensure:
- The backend API contract matches this type definition
- Any serialization/deserialization logic properly handles the new
tenant_id
field
Let's check for API-related files and serialization logic:
Let me gather more information about the GraphQL schema and types to verify the API contract.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for API endpoints and serialization logic related to User type
# Check for API endpoints handling user data
rg -A 5 "user|me" "ui/src/api"
# Check for any serialization/deserialization logic
rg -A 5 "JSON\.(parse|stringify).*User" "ui/src"
# Look for GraphQL queries/mutations related to user
rg -A 5 "query.*User|mutation.*User" "ui/src"
Length of output: 17013
Script:
#!/bin/bash
# Check GraphQL schema and type definitions
rg -A 10 "type User|input.*User|interface.*User" "ui/src"
# Check for tenant_id usage in GraphQL queries/mutations
rg -A 5 "tenant_id" "ui/src/lib/gql"
# Look for generated types related to User
rg -A 5 "export (type|interface) User" "ui/src"
Length of output: 7778
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyshx, to address the API contract mismatch between the frontend User
type and the GraphQL schema, you have a couple of options:
-
Update the GraphQL Schema to Include
tenant_id
:If the
tenant_id
field is needed on the frontend, you should update the GraphQL schema to include this field in theUser
type. This ensures consistency between the frontend types and the backend API responses.Example GraphQL Schema Update:
type User { id: ID! name: String! email: String! host: String tenant_id: ID }
Make sure to also update any GraphQL resolvers to handle the
tenant_id
field appropriately. -
Remove
tenant_id
from the FrontendUser
Type:If the
tenant_id
is not required on the frontend, you can remove it from theUser
type definition inui/src/types/user.ts
to match the backend schema.Updated
ui/src/types/user.ts
:export type User = { id: string; name: string; email: string; // tenant_id?: string; // Remove this line };
Ensure that any code depending on
tenant_id
is also updated accordingly.
Let me know if you need further assistance with this refactor!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
ui/src/lib/yjs/socketYjsManager.ts (4)
235-249
: Refactor 'sendFlowMessage' to avoid unnecessary Promise creationThe
sendFlowMessage
method wraps an async operation inside a newPromise
, which is unnecessary and considered an anti-pattern. You can simplify the function by removing the explicitPromise
construction and letting the function's async nature handle the returnedPromise
.Apply this diff to simplify the function:
-private async sendFlowMessage(message: FlowMessage): Promise<void> { - return new Promise((resolve, reject) => { - if (this.ws.readyState !== WebSocket.OPEN) { - reject(new Error("WebSocket is not connected")); - return; - } - - try { - this.ws.send(JSON.stringify(message)); - resolve(); - } catch (error) { - reject(error); - } - }); +private async sendFlowMessage(message: FlowMessage): Promise<void> { + if (this.ws.readyState !== WebSocket.OPEN) { + throw new Error("WebSocket is not connected"); + } + try { + this.ws.send(JSON.stringify(message)); + } catch (error) { + throw error; + } }
141-159
: Handle unexpected 'event.data' types in 'handleMessage'Currently, the
handleMessage
method assumes thatevent.data
is either anArrayBuffer
or astring
. To enhance robustness, consider adding handling for unexpected data types to prevent silent failures or unhandled exceptions.Add an else clause to log or handle unexpected data types:
} else if (typeof event.data === "string") { // Handle text message const data = JSON.parse(event.data); if (data.type === "authenticate") { await this.onAuthenticateRequest(); } else if (data.type === "ready") { this.onReady(); + } else { + console.warn(`Unhandled message type received: ${data.type}`); } + } else { + console.warn("Received message with unexpected data type:", typeof event.data); }
147-155
: Add default case for unrecognized message typesIn the
handleMessage
method, only specific message types are processed. Adding a default case helps in logging or handling any unexpected message types, which is beneficial for debugging and future message implementations.Consider updating the code to handle unrecognized message types:
} else if (typeof event.data === "string") { // Handle text message const data = JSON.parse(event.data); if (data.type === "authenticate") { await this.onAuthenticateRequest(); } else if (data.type === "ready") { this.onReady(); + } else { + console.warn(`Received unrecognized message type: ${data.type}`); } }
68-68
: Remove console logs or adjust logging level for productionThe console log statement revealing the WebSocket connection details might not be suitable for a production environment. It could expose sensitive information or clutter the console output.
Consider removing the console log or using a debug logger:
- console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); + // For debugging purposes only: + // console.debug("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)
🔇 Additional comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)
135-139
: Verify that 'accessTokenProvider' is defined before use
The method onAuthenticateRequest
uses optional chaining when calling accessTokenProvider
, which could lead to token
being undefined if the provider is not set. Ensure that accessTokenProvider
is always defined when this method is invoked.
Please verify that accessTokenProvider
is always provided and cannot be undefined
at this point. If it can be undefined
, consider adding error handling or default behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Outside diff range and nitpick comments (9)
websocket/crates/infra/src/persistence/redis/updates.rs (2)
125-132
: Use appropriate logging levels for error messagesCurrently, error conditions are logged using the
debug!
macro. For better visibility of errors, consider usingerror!
orwarn!
to ensure that these critical events are properly surfaced in the logs.Apply this change to adjust the logging level:
- debug!( + error!(Also applies to: 153-162, 175-178
150-164
: Refactor duplicate code when applying updatesThe code blocks applying
redis_update
andstream_update
are nearly identical. Extracting the shared logic into a helper function will reduce duplication and enhance maintainability.Apply this diff to refactor the duplicated code:
+fn apply_update( + txn: &mut TransactionMut, + update_data: &[u8], + update_name: &str, +) -> Result<(), FlowProjectRedisDataManagerError> { + if !update_data.is_empty() { + debug!("Applying {} of size {}", update_name, update_data.len()); + match Update::decode_v2(update_data) { + Ok(update) => { + if let Err(e) = txn.apply_update(update) { + error!("Error applying {}: {:?}", update_name, e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } + } + Err(e) => { + error!("Error decoding {}: {:?}", update_name, e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } + } + } + Ok(()) +} - if !redis_update.is_empty() { - debug!("Applying redis update of size {}", redis_update.len()); - match Update::decode_v2(&redis_update) { - Ok(update) => { - if let Err(e) = txn.apply_update(update) { - debug!("Error applying redis update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - Err(e) => { - debug!("Error decoding redis update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - } + apply_update(&mut txn, &redis_update, "redis update")?; - if !stream_update.is_empty() { - debug!("Applying stream update of size {}", stream_update.len()); - match Update::decode_v2(&stream_update) { - Ok(update) => { - if let Err(e) = txn.apply_update(update) { - debug!("Error applying stream update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - Err(e) => { - debug!("Error decoding stream update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - } + apply_update(&mut txn, &stream_update, "stream update")?;Also applies to: 166-180
websocket/app/examples/session_client.rs (2)
Line range hint
1-248
: Enhance example to better demonstrate integration patternsWhile this example effectively shows the WebSocket functionality, consider these improvements to better align with the PR's objectives:
- Add configuration management:
struct Config { host: String, port: u16, user_id: String, project_id: String, room_id: String, auth_token: String, }
- Add error handling examples for common WebSocket scenarios:
- Connection drops
- Authentication failures
- Invalid messages
- Add comments explaining the integration with the new SocketYjsManager:
// This example demonstrates the server-side handling of WebSocket connections // that integrate with the frontend SocketYjsManager (see ui/src/lib/yjs/socketYjsManager.ts)Would you like me to provide a complete example implementation with these improvements?
Session commands are misaligned between frontend and backend
The verification reveals mismatches between the Rust and TypeScript implementations:
- Frontend
SessionCommand
type includes additional commands:MergeUpdates
- Frontend's command structure uses
tag
andcontent
fields, while backend uses named struct variants- Frontend's command content includes optional fields like
data
andupdated_by
not present in backendRequired changes:
- Add
MergeUpdates
variant to Rust'sSessionCommand
enum- Align the command structure by either:
- Using tagged unions in Rust to match TypeScript's structure, or
- Updating TypeScript to use direct field access like Rust
🔗 Analysis chain
Line range hint
13-48
: Verify alignment with frontend session commandsThe
Event
andSessionCommand
enums must match the protocol expected by the new SocketYjsManager.Let's verify the alignment:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if the session commands align with the frontend implementation # Search for corresponding TypeScript types/interfaces rg -t typescript "type (SessionCommand|FlowMessage)" -A 10 # Search for handling of these commands in the socket manager ast-grep --pattern 'class SocketYjsManager { $$$ handle$_($$$) { $$$ } $$$ }'Length of output: 187
Script:
#!/bin/bash # Let's try different approaches to find the frontend implementation # Search for any files containing SocketYjsManager fd -e ts -e js -e tsx -e jsx . | xargs rg "SocketYjsManager" # Search for session-related types/interfaces in all possible frontend files fd -e ts -e js -e tsx -e jsx . | xargs rg "type.*Session|interface.*Session|enum.*Session" # Look for websocket message handling in frontend fd -e ts -e js -e tsx -e jsx . | xargs rg "socket\.on|socket\.emit" # Search for any files that might contain session command definitions fd -e ts -e js -e tsx -e jsx . | xargs rg "Start|End|Complete|CheckStatus|AddTask|RemoveTask"Length of output: 4489
Script:
#!/bin/bash # Let's examine the session command types in the frontend # Get the complete SessionCommand type definition cat ui/src/lib/yjs/types.ts # Check the actual socket message handling in SocketYjsManager cat ui/src/lib/yjs/socketYjsManager.tsLength of output: 9751
ui/src/lib/yjs/socketYjsManager.ts (1)
254-274
: Implement rate limiting for updatesThe update handling lacks rate limiting, which could lead to performance issues with rapid updates.
Consider implementing a debounce mechanism:
+ private updateDebounceTimeout: NodeJS.Timeout | null = null; + private readonly DEBOUNCE_DELAY = 50; // ms protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { + if (this.updateDebounceTimeout) { + clearTimeout(this.updateDebounceTimeout); + } + + this.updateDebounceTimeout = setTimeout(() => { const stateVector = Y.encodeStateVectorFromUpdateV2(update); const diffUpdate = Y.diffUpdateV2(update, stateVector); this.sendFlowMessage({ // ... existing message structure }); + }, this.DEBOUNCE_DELAY); } }websocket/crates/services/src/manage_project_edit_session.rs (4)
283-287
: Consider adding backoff strategy for channel errors.While the current error handling is good, consider implementing an exponential backoff strategy for channel errors to prevent overwhelming the system during issues.
- sleep(Duration::from_secs(1)).await; + let backoff = ExponentialBackoff::default(); + backoff.retry(|| async { + // Retry logic here + }).await;
169-183
: Add error case logging in Complete command.The Complete command's error handling could be enhanced with explicit error logging.
if let Some(mut session) = self.get_latest_session(&project_id).await? { debug!("Found latest session: {:?}", session); let result = self.complete_job_if_met_requirements(&mut session).await; - debug!("Job completion result: {:?}", result); + match &result { + Ok(_) => debug!("Job completion successful"), + Err(e) => debug!("Job completion failed: {:?}", e), + } result?;
Line range hint
13-20
: Consider making timeout constants configurable.The hardcoded timeout values could be made configurable through environment variables or configuration files for better flexibility in different environments.
-const MAX_EMPTY_SESSION_DURATION: Duration = Duration::from_secs(10); -const JOB_COMPLETION_DELAY: Duration = Duration::from_secs(5); +lazy_static! { + static ref MAX_EMPTY_SESSION_DURATION: Duration = Duration::from_secs( + std::env::var("MAX_EMPTY_SESSION_DURATION_SECS") + .unwrap_or("10".to_string()) + .parse() + .unwrap_or(10) + ); + static ref JOB_COMPLETION_DELAY: Duration = Duration::from_secs( + std::env::var("JOB_COMPLETION_DELAY_SECS") + .unwrap_or("5".to_string()) + .parse() + .unwrap_or(5) + ); +}
Line range hint
396-421
: Consider adding metrics for session management.The session management logic would benefit from metrics collection to monitor system health and performance.
pub async fn end_editing_session_if_conditions_met( &self, session: &mut ProjectEditingSession, data: &ManageProjectEditSessionTaskData, ) -> Result<(), ProjectServiceError> { + let start = std::time::Instant::now(); if let Some(client_count) = *data.client_count.read().await { if client_count == 0 { if let Some(clients_disconnected_at) = *data.clients_disconnected_at.read().await { let current_time = Utc::now(); let clients_disconnection_elapsed_time = current_time - clients_disconnected_at; if clients_disconnection_elapsed_time .to_std() .map_err(ProjectServiceError::ChronoDurationConversionError)? > MAX_EMPTY_SESSION_DURATION { self.project_service .end_session("system".to_string(), session.clone()) .await?; + metrics::increment_counter!("session_ended_by_timeout"); } } } } + metrics::histogram!("session_check_duration_ms", start.elapsed().as_millis() as f64); Ok(()) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)websocket/app/examples/session_client.rs
(1 hunks)websocket/crates/infra/src/persistence/redis/updates.rs
(1 hunks)websocket/crates/services/src/manage_project_edit_session.rs
(1 hunks)
🔇 Additional comments (3)
websocket/crates/infra/src/persistence/redis/updates.rs (1)
104-109
: Enhanced logging improves traceability
The added debug statements at the start of merge_updates_internal
provide valuable context for tracing the merge operation.
websocket/crates/services/src/manage_project_edit_session.rs (2)
151-152
: LGTM! Comprehensive debug logging implementation.
The debug logging implementation provides excellent visibility into command processing flow, with clear entry and exit points.
Also applies to: 290-290
184-200
: 🛠️ Refactor suggestion
Consider adding data validation for MergeUpdates.
The MergeUpdates command should validate the data size before processing to prevent potential memory issues.
SessionCommand::MergeUpdates {
project_id,
data,
updated_by,
} => {
+ const MAX_UPDATE_SIZE: usize = 10 * 1024 * 1024; // 10MB
+ if data.len() > MAX_UPDATE_SIZE {
+ debug!("Update size {} exceeds maximum allowed size", data.len());
+ return Err(ProjectServiceError::InvalidInput("Update size too large".into()));
+ }
debug!(
"Merging updates for project: {}, updated by: {:?}, data length: {}",
project_id,
updated_by,
data.len()
);
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | ||
wsUrl.searchParams.set("project_id", data.projectId); | ||
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security: Avoid exposing sensitive data in URL parameters
The WebSocket URL includes sensitive information (user_id, project_id, token) as query parameters, which can be logged or exposed in various ways. Consider using headers or a more secure authentication mechanism.
Consider:
- Using a secure WebSocket handshake protocol
- Moving authentication to headers or a dedicated authentication message
- Implementing token refresh mechanism
Apply this diff as a starting point:
- wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
- wsUrl.searchParams.set("project_id", data.projectId);
- wsUrl.searchParams.set("token", token);
+ // Initialize basic connection
+ this.ws = new WebSocket(wsUrl.href);
+
+ // Send authentication message after connection
+ this.ws.addEventListener("open", () => {
+ this.ws.send(JSON.stringify({
+ type: "authenticate",
+ user_id: this.doc.clientID.toString(),
+ project_id: data.projectId,
+ token
+ }));
+ });
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyshx Long term this is better probably. Make sure to look at this suggestion.
coderabbitai, I don't need you to do anything or reply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
// Initialize with default workflow if empty | ||
if (yWorkflows.length === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary since the doc will always be new, and the useState's initial values (aka this code) will never re-run.
manager | ||
.setupSocket({ | ||
url: websocketUrl, | ||
roomId: workflowId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflowId is probably needing to be changed to projectId. Unless we want a room created and used per workflow/subworkflow. @pyshx What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah since we are passing the projectId too, I guess that IS the idea. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But each room will have the same Y.array doc with all workflows anyways, so if anything this will just cause some perf issues around changing between canvas... hmmm
// Create and setup socket manager | ||
if (!doc) return; | ||
const manager = new SocketYjsManager(doc); | ||
manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be awaited, right? If not an un-initiated manager will potentially be passed to the undomgr
if (!websocketUrl || !websocketToken || !workflowId || !currentProject?.id) | ||
return; | ||
|
||
const doc = managerRef.current?.getDoc() || yWorkflows.doc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary since the useEffect is already doing a cleanup itself in the return statement. (aka managerRef.current
will always be null at this point. If that isn't what you were hoping for, this code needs a refactor a bit probably)
ui/src/lib/yjs/socketYjsManager.ts
Outdated
content: { | ||
project_id: this.projectId || "", | ||
data: new Uint8Array(diffUpdate), | ||
updated_by: this.doc.clientID.toString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used internally only for the websocket server to be able to resolve conflicts between session users? If we want to be able to reference this across sessions, we don't want the clientID, which I believe is auto-generated on each new session connection. We would want the user id probably.
return () => { | ||
manager.destroy(); | ||
undoMngr.destroy(); | ||
managerRef.current = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is manager.destroy()
necessary if we are setting the managerRef.current to null anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)
14-19
: Add type safety to binary message creationThe binary message creation utility could benefit from additional type safety.
Consider this improvement:
-function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { +function createBinaryMessage(type: MessageType, data: Uint8Array | ArrayBuffer): Uint8Array { + const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; const message = new Uint8Array(data.length + 1); message[0] = type; - message.set(data, 1); + message.set(buffer, 1); return message; }
29-31
: Make reconnection parameters configurableThe reconnection parameters are hardcoded. Consider making them configurable through constructor options for better flexibility.
Consider this improvement:
+interface SocketYjsManagerOptions { + maxReconnectAttempts?: number; + reconnectDelay?: number; +} export class SocketYjsManager { protected reconnectAttempts = 0; - protected maxReconnectAttempts = 5; - protected reconnectDelay = 1000; + protected maxReconnectAttempts: number; + protected reconnectDelay: number; - constructor(doc: Y.Doc) { + constructor(doc: Y.Doc, options: SocketYjsManagerOptions = {}) { this.doc = doc; + this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5; + this.reconnectDelay = options.reconnectDelay ?? 1000;
try { | ||
if (this.ws) { | ||
const originalUrl = new URL(this.ws.url); | ||
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; | ||
const roomId = originalUrl.pathname.slice(1); | ||
await this.setupSocket({ | ||
url: baseUrl, | ||
roomId, | ||
projectId: this.projectId || "", | ||
accessTokenProvider: this.accessTokenProvider || (() => ""), | ||
}); | ||
} | ||
} catch (error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add connection cleanup before reconnection
The reconnection attempt should clean up the existing connection before establishing a new one.
Consider this improvement:
this.reconnectTimer = setTimeout(async () => {
try {
if (this.ws) {
+ // Clean up existing connection
+ this.ws.removeEventListener("open", this.onConnectionEstablished);
+ this.ws.removeEventListener("close", this.onConnectionDisconnect);
+ this.ws.removeEventListener("error", this.onConnectionError);
+ this.ws.removeEventListener("message", this.handleMessage);
+ if (this.ws.readyState === WebSocket.OPEN) {
+ this.ws.close();
+ }
+
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try { | |
if (this.ws) { | |
const originalUrl = new URL(this.ws.url); | |
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; | |
const roomId = originalUrl.pathname.slice(1); | |
await this.setupSocket({ | |
url: baseUrl, | |
roomId, | |
projectId: this.projectId || "", | |
accessTokenProvider: this.accessTokenProvider || (() => ""), | |
}); | |
} | |
} catch (error) { | |
try { | |
if (this.ws) { | |
// Clean up existing connection | |
this.ws.removeEventListener("open", this.onConnectionEstablished); | |
this.ws.removeEventListener("close", this.onConnectionDisconnect); | |
this.ws.removeEventListener("error", this.onConnectionError); | |
this.ws.removeEventListener("message", this.handleMessage); | |
if (this.ws.readyState === WebSocket.OPEN) { | |
this.ws.close(); | |
} | |
const originalUrl = new URL(this.ws.url); | |
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; | |
const roomId = originalUrl.pathname.slice(1); | |
await this.setupSocket({ | |
url: baseUrl, | |
roomId, | |
projectId: this.projectId || "", | |
accessTokenProvider: this.accessTokenProvider || (() => ""), | |
}); | |
} | |
} catch (error) { |
ui/src/lib/yjs/socketYjsManager.ts
Outdated
protected async initializeRoom() { | ||
try { | ||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Create", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Join", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Emit", | ||
content: { data: "" }, | ||
}, | ||
session_command: { | ||
tag: "Start", | ||
content: { | ||
project_id: this.projectId || "", | ||
user: { | ||
id: this.doc.clientID.toString(), | ||
tenant_id: this.projectId, | ||
name: "defaultName", | ||
email: "[email protected]", | ||
}, | ||
}, | ||
}, | ||
}); | ||
|
||
await this.syncData(); | ||
} catch (error) { | ||
console.error("Failed to initialize room:", error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add specific error handling for room initialization steps
The error handling in initializeRoom could be more granular to identify which step failed.
Consider this improvement:
protected async initializeRoom() {
+ const steps = [
+ { name: 'Create Room', action: () => this.sendFlowMessage({
+ event: { tag: "Create", content: { room_id: this.doc.clientID.toString() } }
+ })},
+ { name: 'Join Room', action: () => this.sendFlowMessage({
+ event: { tag: "Join", content: { room_id: this.doc.clientID.toString() } }
+ })},
+ { name: 'Start Session', action: () => this.sendFlowMessage({
+ event: { tag: "Emit", content: { data: "" } },
+ session_command: { /* ... existing command ... */ }
+ })},
+ { name: 'Sync Data', action: () => this.syncData() }
+ ];
+
try {
- await this.sendFlowMessage({ /* ... */ });
- await this.sendFlowMessage({ /* ... */ });
- await this.sendFlowMessage({ /* ... */ });
- await this.syncData();
+ for (const step of steps) {
+ try {
+ await step.action();
+ } catch (error) {
+ throw new Error(`Failed at step '${step.name}': ${error instanceof Error ? error.message : 'Unknown error'}`);
+ }
+ }
} catch (error) {
- console.error("Failed to initialize room:", error);
+ console.error(`Room initialization failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
+ throw error;
}
}
Committable suggestion skipped: line range outside the PR's diff.
try { | ||
const token = await this.accessTokenProvider(); | ||
const wsUrl = new URL(data.url); | ||
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | ||
wsUrl.pathname = `/${data.roomId}`; | ||
|
||
// Add query parameters for authentication | ||
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | ||
wsUrl.searchParams.set("project_id", data.projectId); | ||
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers | ||
|
||
this.ws = new WebSocket(wsUrl.href); | ||
this.ws.binaryType = "arraybuffer"; | ||
|
||
this.setupWebSocketListeners(); | ||
this.setupDocListeners(); | ||
|
||
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | ||
} catch (error) { | ||
console.error("Failed to setup WebSocket:", error); | ||
throw error; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling in setupSocket
The error handling could be more specific and provide better context for debugging.
Consider this improvement:
try {
const token = await this.accessTokenProvider();
+ if (!token) {
+ throw new Error("Failed to obtain access token");
+ }
const wsUrl = new URL(data.url);
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
// Add query parameters for authentication
wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
wsUrl.searchParams.set("project_id", data.projectId);
wsUrl.searchParams.set("token", token);
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
this.setupWebSocketListeners();
this.setupDocListeners();
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname);
} catch (error) {
- console.error("Failed to setup WebSocket:", error);
+ const errorMessage = error instanceof Error ? error.message : "Unknown error";
+ console.error(`Failed to setup WebSocket: ${errorMessage}`);
+ if (error instanceof TypeError) {
+ throw new Error(`Invalid WebSocket URL: ${errorMessage}`);
+ }
throw error;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try { | |
const token = await this.accessTokenProvider(); | |
const wsUrl = new URL(data.url); | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; | |
// Add query parameters for authentication | |
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | |
wsUrl.searchParams.set("project_id", data.projectId); | |
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers | |
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
this.setupWebSocketListeners(); | |
this.setupDocListeners(); | |
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | |
} catch (error) { | |
console.error("Failed to setup WebSocket:", error); | |
throw error; | |
} | |
try { | |
const token = await this.accessTokenProvider(); | |
if (!token) { | |
throw new Error("Failed to obtain access token"); | |
} | |
const wsUrl = new URL(data.url); | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; | |
// Add query parameters for authentication | |
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | |
wsUrl.searchParams.set("project_id", data.projectId); | |
wsUrl.searchParams.set("token", token); | |
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
this.setupWebSocketListeners(); | |
this.setupDocListeners(); | |
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | |
} catch (error) { | |
const errorMessage = error instanceof Error ? error.message : "Unknown error"; | |
console.error(`Failed to setup WebSocket: ${errorMessage}`); | |
if (error instanceof TypeError) { | |
throw new Error(`Invalid WebSocket URL: ${errorMessage}`); | |
} | |
throw error; | |
} |
ui/src/lib/yjs/socketYjsManager.ts
Outdated
private async sendFlowMessage(message: FlowMessage): Promise<void> { | ||
return new Promise((resolve, reject) => { | ||
if (this.ws.readyState !== WebSocket.OPEN) { | ||
reject(new Error("WebSocket is not connected")); | ||
return; | ||
} | ||
|
||
try { | ||
this.ws.send(JSON.stringify(message)); | ||
resolve(); | ||
} catch (error) { | ||
reject(error); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout to sendFlowMessage
The sendFlowMessage method should include a timeout to prevent hanging operations.
Consider this improvement:
- private async sendFlowMessage(message: FlowMessage): Promise<void> {
+ private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> {
return new Promise((resolve, reject) => {
+ const timeoutId = setTimeout(() => {
+ reject(new Error(`Flow message timed out after ${timeoutMs}ms`));
+ }, timeoutMs);
+
if (this.ws.readyState !== WebSocket.OPEN) {
+ clearTimeout(timeoutId);
reject(new Error("WebSocket is not connected"));
return;
}
try {
this.ws.send(JSON.stringify(message));
+ clearTimeout(timeoutId);
resolve();
} catch (error) {
+ clearTimeout(timeoutId);
reject(error);
}
});
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async sendFlowMessage(message: FlowMessage): Promise<void> { | |
return new Promise((resolve, reject) => { | |
if (this.ws.readyState !== WebSocket.OPEN) { | |
reject(new Error("WebSocket is not connected")); | |
return; | |
} | |
try { | |
this.ws.send(JSON.stringify(message)); | |
resolve(); | |
} catch (error) { | |
reject(error); | |
} | |
}); | |
} | |
private async sendFlowMessage(message: FlowMessage, timeoutMs: number = 5000): Promise<void> { | |
return new Promise((resolve, reject) => { | |
const timeoutId = setTimeout(() => { | |
reject(new Error(`Flow message timed out after ${timeoutMs}ms`)); | |
}, timeoutMs); | |
if (this.ws.readyState !== WebSocket.OPEN) { | |
clearTimeout(timeoutId); | |
reject(new Error("WebSocket is not connected")); | |
return; | |
} | |
try { | |
this.ws.send(JSON.stringify(message)); | |
clearTimeout(timeoutId); | |
resolve(); | |
} catch (error) { | |
clearTimeout(timeoutId); | |
reject(error); | |
} | |
}); | |
} |
protected async handleMessage(event: MessageEvent) { | ||
try { | ||
if (event.data instanceof ArrayBuffer) { | ||
// Handle binary message (Yjs update) | ||
const update = new Uint8Array(event.data); | ||
await this.onPeerUpdate({ update }); | ||
} else if (typeof event.data === "string") { | ||
// Handle text message | ||
const data = JSON.parse(event.data); | ||
if (data.type === "authenticate") { | ||
await this.onAuthenticateRequest(); | ||
} else if (data.type === "ready") { | ||
this.onReady(); | ||
} | ||
} | ||
} catch (error) { | ||
console.error("Error handling message:", error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling in message processing
The error handling in handleMessage could be more specific and provide better context.
Consider this improvement:
protected async handleMessage(event: MessageEvent) {
try {
if (event.data instanceof ArrayBuffer) {
- // Handle binary message (Yjs update)
+ if (event.data.byteLength === 0) {
+ throw new Error("Received empty binary message");
+ }
const update = new Uint8Array(event.data);
await this.onPeerUpdate({ update });
} else if (typeof event.data === "string") {
- // Handle text message
+ if (!event.data) {
+ throw new Error("Received empty text message");
+ }
const data = JSON.parse(event.data);
+ if (!data.type) {
+ throw new Error("Message type not specified");
+ }
if (data.type === "authenticate") {
await this.onAuthenticateRequest();
} else if (data.type === "ready") {
this.onReady();
+ } else {
+ console.warn(`Unhandled message type: ${data.type}`);
}
}
} catch (error) {
- console.error("Error handling message:", error);
+ const errorMessage = error instanceof Error ? error.message : "Unknown error";
+ console.error(`Error handling WebSocket message: ${errorMessage}`);
+ if (error instanceof SyntaxError) {
+ console.error("Failed to parse message:", event.data);
+ }
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
protected async handleMessage(event: MessageEvent) { | |
try { | |
if (event.data instanceof ArrayBuffer) { | |
// Handle binary message (Yjs update) | |
const update = new Uint8Array(event.data); | |
await this.onPeerUpdate({ update }); | |
} else if (typeof event.data === "string") { | |
// Handle text message | |
const data = JSON.parse(event.data); | |
if (data.type === "authenticate") { | |
await this.onAuthenticateRequest(); | |
} else if (data.type === "ready") { | |
this.onReady(); | |
} | |
} | |
} catch (error) { | |
console.error("Error handling message:", error); | |
} | |
} | |
protected async handleMessage(event: MessageEvent) { | |
try { | |
if (event.data instanceof ArrayBuffer) { | |
if (event.data.byteLength === 0) { | |
throw new Error("Received empty binary message"); | |
} | |
const update = new Uint8Array(event.data); | |
await this.onPeerUpdate({ update }); | |
} else if (typeof event.data === "string") { | |
if (!event.data) { | |
throw new Error("Received empty text message"); | |
} | |
const data = JSON.parse(event.data); | |
if (!data.type) { | |
throw new Error("Message type not specified"); | |
} | |
if (data.type === "authenticate") { | |
await this.onAuthenticateRequest(); | |
} else if (data.type === "ready") { | |
this.onReady(); | |
} else { | |
console.warn(`Unhandled message type: ${data.type}`); | |
} | |
} | |
} catch (error) { | |
const errorMessage = error instanceof Error ? error.message : "Unknown error"; | |
console.error(`Error handling WebSocket message: ${errorMessage}`); | |
if (error instanceof SyntaxError) { | |
console.error("Failed to parse message:", event.data); | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)
14-19
: Add input validation to createBinaryMessage functionThe function should validate its inputs to prevent potential issues with invalid data.
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { + if (!(type in MessageType)) { + throw new Error(`Invalid message type: ${type}`); + } + if (!(data instanceof Uint8Array)) { + throw new Error('Data must be a Uint8Array'); + } const message = new Uint8Array(data.length + 1); message[0] = type; message.set(data, 1); return message; }
244-258
: Add message queue for handling disconnectionsMessages sent during disconnection are lost. Consider implementing a message queue to handle this scenario.
+ private messageQueue: FlowMessage[] = []; + private readonly MAX_QUEUE_SIZE = 100; + private async sendFlowMessage(message: FlowMessage): Promise<void> { return new Promise((resolve, reject) => { if (this.ws.readyState !== WebSocket.OPEN) { - reject(new Error("WebSocket is not connected")); - return; + if (this.messageQueue.length < this.MAX_QUEUE_SIZE) { + this.messageQueue.push(message); + resolve(); + } else { + reject(new Error("Message queue full, connection lost")); + } + return; } + + // Try to send queued messages first + while (this.messageQueue.length > 0 && this.ws.readyState === WebSocket.OPEN) { + const queuedMessage = this.messageQueue.shift(); + if (queuedMessage) { + this.ws.send(JSON.stringify(queuedMessage)); + } + } try { this.ws.send(JSON.stringify(message)); resolve(); } catch (error) { reject(error); } }); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)ui/src/lib/yjs/types.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- ui/src/lib/yjs/types.ts
ui/src/lib/yjs/socketYjsManager.ts
Outdated
protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) { | ||
const update = data.update instanceof ArrayBuffer | ||
? new Uint8Array(data.update) | ||
: data.update; | ||
|
||
const currentState = Y.encodeStateAsUpdateV2(this.doc); | ||
const diffUpdate = Y.diffUpdateV2(update, currentState); | ||
Y.applyUpdateV2(this.doc, diffUpdate, 'peer'); | ||
this.onUpdateHandlers.forEach((handler) => handler(update)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add sync state validation before applying updates
The onPeerUpdate
method should validate the sync state before applying updates to prevent potential data corruption.
protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
+ if (!this.socketReady || !this.firstSyncComplete) {
+ console.warn('Received peer update before sync completion');
+ return;
+ }
+
const update = data.update instanceof ArrayBuffer
? new Uint8Array(data.update)
: data.update;
const currentState = Y.encodeStateAsUpdateV2(this.doc);
const diffUpdate = Y.diffUpdateV2(update, currentState);
Y.applyUpdateV2(this.doc, diffUpdate, 'peer');
this.onUpdateHandlers.forEach((handler) => handler(update));
}
Committable suggestion skipped: line range outside the PR's diff.
const data = JSON.parse(event.data); | ||
if (data.type === "authenticate") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add explicit error handling for JSON parsing
The JSON parsing could fail with invalid data, and the error should be handled explicitly.
- const data = JSON.parse(event.data);
- if (data.type === "authenticate") {
+ let data;
+ try {
+ data = JSON.parse(event.data);
+ if (!data || typeof data !== 'object') {
+ throw new Error('Invalid message format');
+ }
+ } catch (error) {
+ console.error('Failed to parse message:', error);
+ return;
+ }
+ if (data.type === "authenticate") {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const data = JSON.parse(event.data); | |
if (data.type === "authenticate") { | |
let data; | |
try { | |
data = JSON.parse(event.data); | |
if (!data || typeof data !== 'object') { | |
throw new Error('Invalid message format'); | |
} | |
} catch (error) { | |
console.error('Failed to parse message:', error); | |
return; | |
} | |
if (data.type === "authenticate") { |
this.reconnectTimer = setTimeout(async () => { | ||
try { | ||
if (this.ws) { | ||
const originalUrl = new URL(this.ws.url); | ||
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; | ||
const roomId = originalUrl.pathname.slice(1); | ||
await this.setupSocket({ | ||
url: baseUrl, | ||
roomId, | ||
projectId: this.projectId || "", | ||
accessTokenProvider: this.accessTokenProvider || (() => ""), | ||
}); | ||
} | ||
} catch (error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add WebSocket cleanup before reconnection
The reconnection attempt should properly close and clean up the existing WebSocket connection before creating a new one.
this.reconnectTimer = setTimeout(async () => {
try {
if (this.ws) {
+ if (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING) {
+ this.ws.close();
+ }
+ // Wait for the connection to fully close
+ await new Promise(resolve => setTimeout(resolve, 100));
+
const originalUrl = new URL(this.ws.url);
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`;
const roomId = originalUrl.pathname.slice(1);
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (3)
ui/src/lib/yjs/types.ts (3)
12-21
: Consider strengthening type safety for the data fieldThe
data
field is typed as an optional string, but it might contain structured data for different event types. Consider using a generic type parameter or specific data structures based on the event tag to ensure type safety at compile time.export type FlowMessage<T = unknown> = { event: { tag: "Create" | "Join" | "Leave" | "Emit"; content: { room_id?: string; data?: T; // Generic type parameter for better type safety }; }; session_command?: SessionCommand; };
45-50
: Add input validation for robustnessThe function should validate inputs to prevent potential issues:
- Ensure data is not null/empty
- Validate that the message type value is within enum bounds
export function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { if (!data?.length) { throw new Error("Data cannot be empty"); } if (!Object.values(MessageType).includes(type)) { throw new Error("Invalid message type"); } const message = new Uint8Array(data.length + 1); message[0] = type; message.set(data, 1); return message; }
Line range hint
1-50
: Consider error handling and sync strategy documentationThe types suggest a custom WebSocket integration with Yjs. To align with the PR objectives:
- Consider adding error-specific types for consistent error handling across the application
- Document the sync strategy (e.g., when SYNC vs UPDATE messages are used)
- Consider adding JSDoc comments explaining the integration with the Rust backend
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)ui/src/lib/yjs/types.ts
(2 hunks)
🔇 Additional comments (2)
ui/src/lib/yjs/types.ts (1)
40-43
: LGTM!
The MessageType enum is well-defined with clear numeric values for binary message type identification.
ui/src/lib/yjs/socketYjsManager.ts (1)
1-283
: Verify backend compatibility with the WebSocket protocol
The implementation assumes specific backend behavior for room creation, joining, and synchronization. Let's verify the protocol compatibility.
✅ Verification successful
Based on the search results, I can now generate a final response as we have sufficient evidence about the backend implementation:
Backend WebSocket protocol is compatible with the client implementation
The backend implementation in the websocket
directory confirms compatibility with the client's protocol:
- Room management matches client expectations (
Create
,Join
,Emit
events) - Message types and flow handling are consistent
- WebSocket handlers properly handle binary updates and JSON messages
Key matching components:
websocket/app/src/handlers/types.rs
defines the sameFlowMessage
structurewebsocket/app/src/handlers/room_handler.rs
implements the room lifecycle events- Binary message handling for Yjs updates is supported through the WebSocket infrastructure
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check backend WebSocket protocol implementation
# Look for WebSocket protocol documentation
echo "Searching for WebSocket protocol documentation..."
rg -l "websocket|WebSocket" -g "*.md"
# Look for backend WebSocket handler implementations
echo "Searching for backend WebSocket handlers..."
rg -l "tag: \"(Create|Join|Emit)\"" --type rust
Length of output: 293
Script:
#!/bin/bash
# Let's try a broader search for WebSocket-related code and documentation
# Search for WebSocket-related files
echo "=== Searching for WebSocket-related files ==="
fd -t f -e rs -e toml . | xargs rg -l "websocket|WebSocket"
# Search for Flow message types and handlers
echo -e "\n=== Searching for Flow message handling ==="
rg "enum.*Flow.*|struct.*Flow.*" --type rust -A 5
# Search for room management code
echo -e "\n=== Searching for room management code ==="
rg "room_id|RoomId" --type rust -A 3
# Look for protocol types
echo -e "\n=== Searching for protocol types ==="
rg "type.*Message|enum.*Message" --type rust -A 5
Length of output: 15685
export type SessionCommand = { | ||
tag: | ||
| "Start" | ||
| "End" | ||
| "Complete" | ||
| "CheckStatus" | ||
| "AddTask" | ||
| "RemoveTask" | ||
| "ListAllSnapshotsVersions" | ||
| "MergeUpdates"; | ||
content: { | ||
project_id?: string; | ||
user?: User; | ||
data?: Uint8Array; | ||
}; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance type safety and data consistency
Several observations:
- Some fields might be required depending on the command tag
- The
data
field type (Uint8Array) differs from FlowMessage's data field (string)
Consider using discriminated unions and consistent data types:
export type SessionCommand = {
tag: "CheckStatus" | "ListAllSnapshotsVersions"; // Commands without required fields
content: {
project_id?: string;
user?: User;
data?: Uint8Array;
};
} | {
tag: "Start" | "End" | "Complete" | "AddTask" | "RemoveTask" | "MergeUpdates"; // Commands requiring project and user
content: {
project_id: string; // Required
user: User; // Required
data?: Uint8Array;
};
};
ui/src/lib/yjs/socketYjsManager.ts
Outdated
protected async initializeRoom() { | ||
try { | ||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Create", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Join", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Emit", | ||
content: { data: "" }, | ||
}, | ||
session_command: { | ||
tag: "Start", | ||
content: { project_id: this.projectId }, | ||
}, | ||
}); | ||
|
||
await this.syncData(); | ||
} catch (error) { | ||
console.error("Failed to initialize room:", error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add protection against race conditions in room initialization
The sequential flow messages could be affected by network delays or connection issues, potentially leading to race conditions.
protected async initializeRoom() {
+ if (!this.socketReady) {
+ throw new Error('Socket not ready for room initialization');
+ }
+
try {
+ const roomId = this.doc.clientID.toString();
+
await this.sendFlowMessage({
event: {
tag: "Create",
- content: { room_id: this.doc.clientID.toString() },
+ content: { room_id: roomId },
},
});
await this.sendFlowMessage({
event: {
tag: "Join",
- content: { room_id: this.doc.clientID.toString() },
+ content: { room_id: roomId },
},
});
Committable suggestion skipped: line range outside the PR's diff.
// Replace the setupSocket method | ||
async setupSocket(data: { | ||
url: string; | ||
roomId: string; | ||
projectId: string; | ||
accessTokenProvider: AccessTokenProvider; | ||
}) { | ||
this.accessTokenProvider = data.accessTokenProvider; | ||
this.projectId = data.projectId; | ||
|
||
try { | ||
const token = await this.accessTokenProvider(); | ||
const wsUrl = new URL(data.url); | ||
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | ||
wsUrl.pathname = `/${data.roomId}`; | ||
|
||
// Add query parameters for authentication | ||
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | ||
wsUrl.searchParams.set("project_id", data.projectId); | ||
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers | ||
|
||
this.ws = new WebSocket(wsUrl.href); | ||
this.ws.binaryType = "arraybuffer"; | ||
|
||
this.setupWebSocketListeners(); | ||
this.setupDocListeners(); | ||
|
||
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | ||
} catch (error) { | ||
console.error("Failed to setup WebSocket:", error); | ||
throw error; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add input validation for setupSocket parameters
The method lacks validation for required parameters. URL construction should be more robust.
async setupSocket(data: {
url: string;
roomId: string;
projectId: string;
accessTokenProvider: AccessTokenProvider;
}) {
+ // Validate input parameters
+ if (!data.url || !data.roomId || !data.projectId) {
+ throw new Error('Required parameters missing');
+ }
+
+ // Validate URL format
+ try {
+ new URL(data.url);
+ } catch (error) {
+ throw new Error('Invalid URL format');
+ }
this.accessTokenProvider = data.accessTokenProvider;
this.projectId = data.projectId;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Replace the setupSocket method | |
async setupSocket(data: { | |
url: string; | |
roomId: string; | |
projectId: string; | |
accessTokenProvider: AccessTokenProvider; | |
}) { | |
this.accessTokenProvider = data.accessTokenProvider; | |
this.projectId = data.projectId; | |
try { | |
const token = await this.accessTokenProvider(); | |
const wsUrl = new URL(data.url); | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; | |
// Add query parameters for authentication | |
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | |
wsUrl.searchParams.set("project_id", data.projectId); | |
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers | |
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
this.setupWebSocketListeners(); | |
this.setupDocListeners(); | |
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | |
} catch (error) { | |
console.error("Failed to setup WebSocket:", error); | |
throw error; | |
} | |
} | |
// Replace the setupSocket method | |
async setupSocket(data: { | |
url: string; | |
roomId: string; | |
projectId: string; | |
accessTokenProvider: AccessTokenProvider; | |
}) { | |
// Validate input parameters | |
if (!data.url || !data.roomId || !data.projectId) { | |
throw new Error('Required parameters missing'); | |
} | |
// Validate URL format | |
try { | |
new URL(data.url); | |
} catch (error) { | |
throw new Error('Invalid URL format'); | |
} | |
this.accessTokenProvider = data.accessTokenProvider; | |
this.projectId = data.projectId; | |
try { | |
const token = await this.accessTokenProvider(); | |
const wsUrl = new URL(data.url); | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; | |
// Add query parameters for authentication | |
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | |
wsUrl.searchParams.set("project_id", data.projectId); | |
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers | |
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
this.setupWebSocketListeners(); | |
this.setupDocListeners(); | |
console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); | |
} catch (error) { | |
console.error("Failed to setup WebSocket:", error); | |
throw error; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)
17-19
: Consider making reconnection parameters configurableThe reconnection parameters are hardcoded, which limits flexibility for different network conditions or requirements.
Consider making these parameters configurable through the constructor:
- protected reconnectAttempts = 0; - protected maxReconnectAttempts = 5; - protected reconnectDelay = 1000; + protected reconnectAttempts = 0; + constructor( + doc: Y.Doc, + protected maxReconnectAttempts = 5, + protected reconnectDelay = 1000 + ) {
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)ui/src/lib/yjs/types.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- ui/src/lib/yjs/types.ts
ui/src/lib/yjs/socketYjsManager.ts
Outdated
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { | ||
const message = new Uint8Array(data.length + 1); | ||
message[0] = type; | ||
message.set(data, 1); | ||
return message; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation to createBinaryMessage
The function doesn't validate the MessageType enum value or the data array.
Add validation to prevent invalid message creation:
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+ if (!Object.values(MessageType).includes(type)) {
+ throw new Error(`Invalid message type: ${type}`);
+ }
+ if (!data || !(data instanceof Uint8Array)) {
+ throw new Error('Invalid data: must be a Uint8Array');
+ }
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { | |
const message = new Uint8Array(data.length + 1); | |
message[0] = type; | |
message.set(data, 1); | |
return message; | |
} | |
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { | |
if (!Object.values(MessageType).includes(type)) { | |
throw new Error(`Invalid message type: ${type}`); | |
} | |
if (!data || !(data instanceof Uint8Array)) { | |
throw new Error('Invalid data: must be a Uint8Array'); | |
} | |
const message = new Uint8Array(data.length + 1); | |
message[0] = type; | |
message.set(data, 1); | |
return message; | |
} |
projectId: this.projectId || "", | ||
accessTokenProvider: this.accessTokenProvider || (() => ""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid silent failures with empty projectId
Using empty strings as fallbacks for required parameters could lead to silent failures.
Consider throwing an error instead:
- projectId: this.projectId || "",
- accessTokenProvider: this.accessTokenProvider || (() => ""),
+ projectId: this.projectId ?? (() => {
+ throw new Error("Project ID is required for reconnection");
+ })(),
+ accessTokenProvider: this.accessTokenProvider ?? (() => {
+ throw new Error("Access token provider is required for reconnection");
+ }),
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
projectId: this.projectId || "", | |
accessTokenProvider: this.accessTokenProvider || (() => ""), | |
projectId: this.projectId ?? (() => { | |
throw new Error("Project ID is required for reconnection"); | |
})(), | |
accessTokenProvider: this.accessTokenProvider ?? (() => { | |
throw new Error("Access token provider is required for reconnection"); | |
}), |
ui/src/lib/yjs/socketYjsManager.ts
Outdated
private setupDocListeners() { | ||
this.doc.on("update", this.onDocUpdate); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add cleanup for doc listeners
The document update listener is not removed when the instance is destroyed, which could lead to memory leaks.
Add cleanup in the destroy method:
private setupDocListeners() {
this.doc.on("update", this.onDocUpdate);
}
+
+private cleanupDocListeners() {
+ this.doc.off("update", this.onDocUpdate);
+}
And in the destroy method:
destroy() {
+ this.cleanupDocListeners();
if (this.reconnectTimer) {
Committable suggestion skipped: line range outside the PR's diff.
ui/src/lib/yjs/socketYjsManager.ts
Outdated
async syncData() { | ||
await this.isReady(); | ||
|
||
const currentState = Y.encodeStateAsUpdateV2(this.doc); | ||
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); | ||
|
||
if (this.ws.readyState === WebSocket.OPEN) { | ||
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); | ||
this.ws.send(syncMessage); | ||
} | ||
|
||
if (!this.firstSyncComplete) { | ||
this.firstSyncComplete = true; | ||
queueMicrotask(() => this.syncData()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent parallel sync operations
Multiple calls to syncData could run in parallel, potentially causing race conditions.
Add a lock mechanism to prevent parallel sync operations:
+ private syncInProgress = false;
+
async syncData() {
+ if (this.syncInProgress) {
+ return;
+ }
+ this.syncInProgress = true;
+ try {
await this.isReady();
const currentState = Y.encodeStateAsUpdateV2(this.doc);
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState);
if (this.ws.readyState === WebSocket.OPEN) {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
this.ws.send(syncMessage);
}
if (!this.firstSyncComplete) {
this.firstSyncComplete = true;
queueMicrotask(() => this.syncData());
}
+ } finally {
+ this.syncInProgress = false;
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async syncData() { | |
await this.isReady(); | |
const currentState = Y.encodeStateAsUpdateV2(this.doc); | |
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); | |
if (this.ws.readyState === WebSocket.OPEN) { | |
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); | |
this.ws.send(syncMessage); | |
} | |
if (!this.firstSyncComplete) { | |
this.firstSyncComplete = true; | |
queueMicrotask(() => this.syncData()); | |
} | |
} | |
private syncInProgress = false; | |
async syncData() { | |
if (this.syncInProgress) { | |
return; | |
} | |
this.syncInProgress = true; | |
try { | |
await this.isReady(); | |
const currentState = Y.encodeStateAsUpdateV2(this.doc); | |
const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); | |
if (this.ws.readyState === WebSocket.OPEN) { | |
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); | |
this.ws.send(syncMessage); | |
} | |
if (!this.firstSyncComplete) { | |
this.firstSyncComplete = true; | |
queueMicrotask(() => this.syncData()); | |
} | |
} finally { | |
this.syncInProgress = false; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (3)
ui/src/lib/yjs/socketYjsManager.ts (3)
69-72
: Enhance error handling in setupSocketThe catch block logs and re-throws the error without providing context about which part of the setup failed.
Consider this improvement:
- } catch (error) { - console.error("Failed to setup WebSocket:", error); - throw error; + } catch (error) { + const phase = !this.accessTokenProvider ? "token provider" + : !this.projectId ? "project ID" + : "WebSocket connection"; + console.error(`WebSocket setup failed during ${phase}:`, error); + throw new Error(`Failed to setup WebSocket (${phase}): ${error instanceof Error ? error.message : String(error)}`); }
129-132
: Improve error handling in onConnectionErrorThe error handler provides minimal context and immediately triggers reconnection.
Consider this improvement:
- protected onConnectionError(error: Event) { - console.error("WebSocket error:", error); - this.reconnect(); + protected onConnectionError(error: Event) { + const wsState = this.ws?.readyState; + const stateMap = { + [WebSocket.CONNECTING]: 'CONNECTING', + [WebSocket.OPEN]: 'OPEN', + [WebSocket.CLOSING]: 'CLOSING', + [WebSocket.CLOSED]: 'CLOSED', + }; + console.error(`WebSocket error (state: ${stateMap[wsState ?? -1] ?? 'UNKNOWN'}):`, error); + if (wsState !== WebSocket.CLOSING && wsState !== WebSocket.CLOSED) { + this.reconnect(); + } }
377-392
: Consider using debug level loggingThe extensive logging in createBinaryMessage should be behind a debug flag to avoid performance impact in production.
Consider this improvement:
+const DEBUG = process.env.NODE_ENV !== 'production'; + function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { - console.log("Creating binary message:", { + DEBUG && console.log("Creating binary message:", { type, dataLength: data.length, rawData: Array.from(data) }); const message = new Uint8Array(data.length + 1); message[0] = type; message.set(data, 1); - console.log("Created message:", { + DEBUG && console.log("Created message:", { messageLength: message.length, messageType: message[0], rawMessage: Array.from(message) });
this.onUpdateHandlers.push(handler); | ||
} | ||
|
||
protected async destroy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change destroy method visibility to public
The destroy method is marked as protected but should be public as it's needed for cleanup when the manager is no longer needed.
- protected async destroy() {
+ public async destroy() {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
protected async destroy() { | |
public async destroy() { |
export class SocketYjsManager { | ||
protected ws!: WebSocket; | ||
protected doc: Y.Doc; | ||
protected socketReady = false; | ||
protected firstSyncComplete = false; | ||
protected accessTokenProvider: AccessTokenProvider | undefined; | ||
protected projectId: string | undefined; | ||
protected onUpdateHandlers: ((update: Uint8Array) => void)[] = []; | ||
protected reconnectAttempts = 0; | ||
protected maxReconnectAttempts = 5; | ||
protected reconnectDelay = 1000; | ||
protected reconnectTimer: ReturnType<typeof setTimeout> | null = null; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider revising field visibility and optional types
The class has several issues in its field declarations:
- Protected visibility seems unnecessary for internal implementation
- Optional fields (
accessTokenProvider
andprojectId
) are used throughout the code without proper null checks
Consider this improvement:
-export class SocketYjsManager {
- protected ws!: WebSocket;
- protected doc: Y.Doc;
- protected socketReady = false;
- protected firstSyncComplete = false;
- protected accessTokenProvider: AccessTokenProvider | undefined;
- protected projectId: string | undefined;
- protected onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
+export class SocketYjsManager {
+ private ws!: WebSocket;
+ private doc: Y.Doc;
+ private socketReady = false;
+ private firstSyncComplete = false;
+ private accessTokenProvider!: AccessTokenProvider;
+ private projectId!: string;
+ private onUpdateHandlers: ((update: Uint8Array) => void)[] = [];
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)
17-19
: Consider making reconnection parameters configurableThe reconnection parameters (attempts and delay) are hardcoded. Consider making these configurable through constructor options to allow different retry strategies for different environments.
- protected reconnectAttempts = 0; - protected maxReconnectAttempts = 5; - protected reconnectDelay = 1000; + protected reconnectAttempts = 0; + protected maxReconnectAttempts: number; + protected reconnectDelay: number; + + constructor(doc: Y.Doc, options: { + maxReconnectAttempts?: number; + reconnectDelay?: number; + } = {}) { + this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5; + this.reconnectDelay = options.reconnectDelay ?? 1000;
230-272
: Add logging control mechanismThe extensive debug logging could impact performance and pollute logs in production.
+ protected debugLogging = false; + + private log(...args: unknown[]) { + if (this.debugLogging) { + console.log(...args); + } + } + // Then replace console.log calls with this.log - console.log("Received peer update:", { + this.log("Received peer update:", {
if (event.data instanceof ArrayBuffer) { | ||
// Handle binary message (Yjs update) | ||
const update = new Uint8Array(event.data); | ||
await this.onPeerUpdate({ update }); | ||
} else if (typeof event.data === "string") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add binary message validation
The binary message handling lacks validation of the message format and size.
if (event.data instanceof ArrayBuffer) {
+ if (event.data.byteLength < 1) {
+ throw new Error('Invalid binary message: empty message');
+ }
+ if (event.data.byteLength > 1024 * 1024) { // 1MB limit
+ throw new Error('Invalid binary message: message too large');
+ }
const update = new Uint8Array(event.data);
+ const messageType = update[0];
+ if (!Object.values(MessageType).includes(messageType)) {
+ throw new Error(`Invalid binary message: unknown message type ${messageType}`);
+ }
await this.onPeerUpdate({ update });
Committable suggestion skipped: line range outside the PR's diff.
onUpdate(handler: (update: Uint8Array) => void) { | ||
this.onUpdateHandlers.push(handler); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for update handlers
The update handler execution doesn't handle errors, which could prevent subsequent handlers from running.
onUpdate(handler: (update: Uint8Array) => void) {
- this.onUpdateHandlers.push(handler);
+ const wrappedHandler = (update: Uint8Array) => {
+ try {
+ handler(update);
+ } catch (error) {
+ console.error('Error in update handler:', error);
+ }
+ };
+ this.onUpdateHandlers.push(wrappedHandler);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
onUpdate(handler: (update: Uint8Array) => void) { | |
this.onUpdateHandlers.push(handler); | |
} | |
onUpdate(handler: (update: Uint8Array) => void) { | |
const wrappedHandler = (update: Uint8Array) => { | |
try { | |
handler(update); | |
} catch (error) { | |
console.error('Error in update handler:', error); | |
} | |
}; | |
this.onUpdateHandlers.push(wrappedHandler); | |
} |
private reconnect() { | ||
if (this.reconnectTimer) { | ||
clearTimeout(this.reconnectTimer); | ||
} | ||
|
||
if (this.reconnectAttempts < this.maxReconnectAttempts) { | ||
this.reconnectAttempts++; | ||
const delay = | ||
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); | ||
|
||
this.reconnectTimer = setTimeout(async () => { | ||
try { | ||
if (this.ws) { | ||
const originalUrl = new URL(this.ws.url); | ||
const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; | ||
const roomId = originalUrl.pathname.slice(1); | ||
await this.setupSocket({ | ||
url: baseUrl, | ||
roomId, | ||
projectId: this.projectId || "", | ||
accessTokenProvider: this.accessTokenProvider || (() => ""), | ||
}); | ||
} | ||
} catch (error) { | ||
console.error("Reconnection failed:", error); | ||
} | ||
}, delay); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add connection state tracking
The reconnection logic could benefit from explicit state tracking to prevent race conditions and multiple simultaneous reconnection attempts.
+ private connectionState: 'disconnected' | 'connecting' | 'connected' = 'disconnected';
+
private reconnect() {
+ if (this.connectionState === 'connecting') {
+ return; // Prevent multiple simultaneous reconnection attempts
+ }
+
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
+ this.connectionState = 'connecting';
Committable suggestion skipped: line range outside the PR's diff.
this.ws = new WebSocket(wsUrl.href); | ||
this.ws.binaryType = "arraybuffer"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add connection timeout handling
The WebSocket connection attempt lacks a timeout, which could leave the application hanging if the server is unresponsive.
+ const connectionTimeout = setTimeout(() => {
+ if (this.ws.readyState === WebSocket.CONNECTING) {
+ this.ws.close();
+ throw new Error('WebSocket connection timeout');
+ }
+ }, 10000); // 10 seconds timeout
+
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
+
+ this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
const connectionTimeout = setTimeout(() => { | |
if (this.ws.readyState === WebSocket.CONNECTING) { | |
this.ws.close(); | |
throw new Error('WebSocket connection timeout'); | |
} | |
}, 10000); // 10 seconds timeout | |
this.ws = new WebSocket(wsUrl.href); | |
this.ws.binaryType = "arraybuffer"; | |
this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true }); |
protected async initializeRoom() { | ||
try { | ||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Create", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Join", | ||
content: { room_id: this.doc.clientID.toString() }, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Emit", | ||
content: { data: "" }, | ||
}, | ||
session_command: { | ||
tag: "Start", | ||
content: { | ||
project_id: this.projectId, | ||
user: { | ||
id: this.doc.clientID.toString(), | ||
name: "", | ||
email: "", | ||
tenant_id: "123" | ||
} | ||
}, | ||
}, | ||
}); | ||
|
||
await this.sendFlowMessage({ | ||
event: { | ||
tag: "Emit", | ||
content: { data: "" }, | ||
}, | ||
session_command: { | ||
tag: "AddTask", | ||
content: { | ||
project_id: this.projectId, | ||
}, | ||
}, | ||
}); | ||
|
||
await this.syncData(); | ||
} catch (error) { | ||
console.error("Failed to initialize room:", error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout for room initialization
The room initialization process could hang indefinitely if any of the steps fail silently.
protected async initializeRoom() {
+ const initTimeout = setTimeout(() => {
+ throw new Error('Room initialization timeout after 30 seconds');
+ }, 30000);
+
try {
await this.sendFlowMessage({
// ... existing code ...
});
+ } finally {
+ clearTimeout(initTimeout);
}
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)
17-19
: Make reconnection parameters configurableThe reconnection parameters are hardcoded. Consider making them configurable through constructor options to support different use cases and environments.
Apply this diff:
+interface SocketYjsManagerOptions { + maxReconnectAttempts?: number; + reconnectDelay?: number; +} export class SocketYjsManager { - protected reconnectAttempts = 0; - protected maxReconnectAttempts = 5; - protected reconnectDelay = 1000; + private reconnectAttempts = 0; + private maxReconnectAttempts: number; + private reconnectDelay: number; - constructor(doc: Y.Doc) { + constructor(doc: Y.Doc, options: SocketYjsManagerOptions = {}) { this.doc = doc; + this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5; + this.reconnectDelay = options.reconnectDelay ?? 1000;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)
🔇 Additional comments (5)
ui/src/lib/yjs/socketYjsManager.ts (5)
376-381
: 🛠️ Refactor suggestion
Add input validation to createBinaryMessage
The binary message creation lacks validation for the input parameters.
Apply this diff:
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+ if (!Object.values(MessageType).includes(type)) {
+ throw new Error(`Invalid message type: ${type}`);
+ }
+ if (!(data instanceof Uint8Array)) {
+ throw new Error('Data must be a Uint8Array');
+ }
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
Likely invalid or redundant comment.
185-191
:
Remove hardcoded user information
The user information contains hardcoded empty values and a static tenant ID, which could cause issues in production.
Add proper user information handling:
+interface UserInfo {
+ name: string;
+ email: string;
+ tenant_id: string;
+}
export class SocketYjsManager {
+ private userInfo: UserInfo;
- constructor(doc: Y.Doc) {
+ constructor(doc: Y.Doc, userInfo: UserInfo) {
this.doc = doc;
+ this.userInfo = userInfo;
// ... rest of constructor
}
// In initializeRoom:
- user: {
- id: this.doc.clientID.toString(),
- name: "",
- email: "",
- tenant_id: "123"
- }
+ user: {
+ id: this.doc.clientID.toString(),
+ ...this.userInfo
+ }
Likely invalid or redundant comment.
14-15
:
Make required fields non-optional
The accessTokenProvider
and projectId
fields are marked as optional but are required for the WebSocket connection to work. This could lead to runtime errors.
Apply this diff:
- protected accessTokenProvider: AccessTokenProvider | undefined;
- protected projectId: string | undefined;
+ private accessTokenProvider!: AccessTokenProvider;
+ private projectId!: string;
Likely invalid or redundant comment.
215-219
:
Replace recursive isReady with async polling
The recursive implementation of isReady
could cause stack overflow for long waiting periods.
Apply this diff:
async isReady(): Promise<boolean> {
- if (this.socketReady) return true;
- await sleep(100);
- return await this.isReady();
+ const maxAttempts = 100; // 10 seconds total
+ let attempts = 0;
+ while (!this.socketReady && attempts < maxAttempts) {
+ await sleep(100);
+ attempts++;
+ }
+ if (!this.socketReady) {
+ throw new Error('Connection timeout waiting for ready state');
+ }
+ return true;
}
Likely invalid or redundant comment.
58-60
:
Security: Move sensitive data from URL parameters to headers
Passing sensitive information like user_id
, project_id
, and token
in URL parameters exposes them to logs and potential security risks.
Consider using a more secure approach:
- wsUrl.searchParams.set("user_id", this.doc.clientID.toString());
- wsUrl.searchParams.set("project_id", data.projectId);
- wsUrl.searchParams.set("token", token);
+ const headers = {
+ 'X-User-ID': this.doc.clientID.toString(),
+ 'X-Project-ID': data.projectId,
+ 'Authorization': `Bearer ${token}`
+ };
+
+ // Note: Since WebSocket API doesn't support custom headers directly,
+ // consider implementing a handshake endpoint that returns a secure
+ // WebSocket URL after validating the credentials
Likely invalid or redundant comment.
console.log("Received peer update:", { | ||
updateLength: update.length, | ||
rawUpdate: Array.from(update) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add conditional debug logging
The code contains extensive debug logging that should be conditional to avoid performance impact in production.
Add a debug mode flag and wrap all debug logs:
+ private debugMode = false;
+
+ private debugLog(message: string, data?: unknown) {
+ if (this.debugMode) {
+ console.log(message, data);
+ }
+ }
protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) {
- console.log("Received peer update:", {
+ this.debugLog("Received peer update:", {
Also applies to: 236-239, 242-245, 255-258, 261-264, 268-272, 291-296, 308-321
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
ui/src/lib/yjs/socketYjsManager.ts (2)
17-20
: Make reconnection parameters configurableThe reconnection parameters are hardcoded. Consider making them configurable through the constructor to allow different retry strategies for different environments.
- protected reconnectAttempts = 0; - protected maxReconnectAttempts = 5; - protected reconnectDelay = 1000; + protected reconnectAttempts = 0; + constructor( + doc: Y.Doc, + private config: { + maxReconnectAttempts?: number; + reconnectDelay?: number; + } = {} + ) { + this.maxReconnectAttempts = config.maxReconnectAttempts ?? 5; + this.reconnectDelay = config.reconnectDelay ?? 1000;
25-34
: Use class fields with arrow functions to avoid manual bindingThe manual method binding in the constructor can be eliminated by using class fields with arrow functions, making the code more maintainable and less prone to binding errors.
- // Bind methods - this.onConnectionEstablished = this.onConnectionEstablished.bind(this); - this.onConnectionDisconnect = this.onConnectionDisconnect.bind(this); - this.onConnectionError = this.onConnectionError.bind(this); - this.onAuthenticateRequest = this.onAuthenticateRequest.bind(this); - this.onDocUpdate = this.onDocUpdate.bind(this); - this.onReady = this.onReady.bind(this); - this.onPeerUpdate = this.onPeerUpdate.bind(this); - this.handleMessage = this.handleMessage.bind(this); - this.reconnect = this.reconnect.bind(this); + // Methods will be automatically bound when defined as class fields + protected onConnectionEstablished = () => { + this.reconnectAttempts = 0; + this.socketReady = true; + this.initializeRoom().catch(console.error); + };
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)
🔇 Additional comments (1)
ui/src/lib/yjs/socketYjsManager.ts (1)
369-374
: 🛠️ Refactor suggestion
Add size validation for binary messages
The binary message creation should validate the input data size to prevent memory issues with large updates.
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+ const MAX_MESSAGE_SIZE = 1024 * 1024; // 1MB limit
+ if (data.length > MAX_MESSAGE_SIZE) {
+ throw new Error(`Message size exceeds limit: ${data.length} bytes`);
+ }
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
Likely invalid or redundant comment.
const wsUrl = new URL(data.url); | ||
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | ||
wsUrl.pathname = `/${data.roomId}`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add URL protocol validation
The URL protocol replacement assumes HTTP/HTTPS protocols. Add validation to ensure only supported protocols are processed.
const wsUrl = new URL(data.url);
+ if (!wsUrl.protocol.startsWith('http')) {
+ throw new Error('URL protocol must be HTTP or HTTPS');
+ }
wsUrl.protocol = wsUrl.protocol.replace("http", "ws");
wsUrl.pathname = `/${data.roomId}`;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const wsUrl = new URL(data.url); | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; | |
const wsUrl = new URL(data.url); | |
if (!wsUrl.protocol.startsWith('http')) { | |
throw new Error('URL protocol must be HTTP or HTTPS'); | |
} | |
wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); | |
wsUrl.pathname = `/${data.roomId}`; |
protected onConnectionError(error: Event) { | ||
console.error("WebSocket error:", error); | ||
this.reconnect(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error event typing
The error event parameter should use the specific ErrorEvent
type instead of the generic Event
type.
- protected onConnectionError(error: Event) {
+ protected onConnectionError(error: ErrorEvent) {
console.error("WebSocket error:", error);
+ console.error("Error details:", error.error);
this.reconnect();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
protected onConnectionError(error: Event) { | |
console.error("WebSocket error:", error); | |
this.reconnect(); | |
} | |
protected onConnectionError(error: ErrorEvent) { | |
console.error("WebSocket error:", error); | |
console.error("Error details:", error.error); | |
this.reconnect(); | |
} |
if (this.ws.readyState === WebSocket.OPEN) { | ||
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); | ||
console.log("Sending sync message:", { | ||
messageLength: syncMessage.length, | ||
messageType: syncMessage[0], | ||
rawMessage: Array.from(syncMessage) | ||
}); | ||
this.ws.send(syncMessage); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle WebSocket state changes during sync
The WebSocket state should be checked both before and after preparing the sync message to handle potential disconnections.
- if (this.ws.readyState === WebSocket.OPEN) {
+ const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
+ if (this.ws.readyState !== WebSocket.OPEN) {
+ throw new Error('WebSocket disconnected while preparing sync message');
+ }
+ try {
const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector);
console.log("Sending sync message:", {
messageLength: syncMessage.length,
messageType: syncMessage[0],
rawMessage: Array.from(syncMessage)
});
this.ws.send(syncMessage);
+ } catch (error) {
+ this.socketReady = false;
+ throw error;
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
ui/src/lib/yjs/socketYjsManager.ts
(1 hunks)
🔇 Additional comments (6)
ui/src/lib/yjs/socketYjsManager.ts (6)
325-325
:
Change destroy method visibility to public.
The destroy method should be public as it's needed for cleanup when the manager is no longer needed.
- protected async destroy() {
+ public async destroy() {
Likely invalid or redundant comment.
83-84
: 🛠️ Refactor suggestion
Add maximum delay cap to exponential backoff.
The exponential backoff delay grows unbounded, which could lead to very long reconnection delays.
Add a maximum delay cap:
+ const maxDelay = 30000; // 30 seconds maximum delay
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
+ maxDelay
);
Likely invalid or redundant comment.
215-219
:
Refactor 'isReady' method to eliminate recursion.
The recursive implementation of isReady
can lead to a stack overflow for prolonged waiting periods.
Refactor to use a loop:
async isReady(): Promise<boolean> {
- if (this.socketReady) return true;
- await sleep(100);
- return await this.isReady();
+ while (!this.socketReady) {
+ await sleep(100);
+ }
+ return true;
}
Likely invalid or redundant comment.
370-375
: 🛠️ Refactor suggestion
Add input validation to createBinaryMessage.
The function doesn't validate the MessageType enum value or the data array.
Add validation:
function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array {
+ if (!Object.values(MessageType).includes(type)) {
+ throw new Error(`Invalid message type: ${type}`);
+ }
+ if (!data || !(data instanceof Uint8Array)) {
+ throw new Error('Invalid data: must be a Uint8Array');
+ }
const message = new Uint8Array(data.length + 1);
message[0] = type;
message.set(data, 1);
return message;
}
Likely invalid or redundant comment.
143-147
: 🛠️ Refactor suggestion
Add binary message validation.
The binary message handling lacks validation of the message format and size.
Add proper validation:
if (event.data instanceof ArrayBuffer) {
+ if (event.data.byteLength < 1) {
+ throw new Error('Invalid binary message: empty message');
+ }
+ if (event.data.byteLength > 1024 * 1024) { // 1MB limit
+ throw new Error('Invalid binary message: message too large');
+ }
const update = new Uint8Array(event.data);
Likely invalid or redundant comment.
62-64
: 🛠️ Refactor suggestion
Add connection timeout handling.
The WebSocket connection attempt lacks a timeout, which could leave the application hanging if the server is unresponsive.
Add a timeout mechanism:
+ const connectionTimeout = setTimeout(() => {
+ if (this.ws.readyState === WebSocket.CONNECTING) {
+ this.ws.close();
+ throw new Error('WebSocket connection timeout');
+ }
+ }, 10000); // 10 seconds timeout
+
this.ws = new WebSocket(wsUrl.href);
this.ws.binaryType = "arraybuffer";
+
+ this.ws.addEventListener('open', () => clearTimeout(connectionTimeout), { once: true });
Likely invalid or redundant comment.
wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); | ||
wsUrl.searchParams.set("project_id", data.projectId); | ||
wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security: Move sensitive data from URL parameters to headers.
Exposing sensitive information like tokens and IDs in URL parameters is a security risk as they can be logged or exposed in various ways.
Consider using a secure WebSocket handshake protocol or moving authentication to a dedicated message after connection.
project_id: this.projectId, | ||
user: { | ||
id: this.doc.clientID.toString(), | ||
name: "", | ||
email: "", | ||
tenant_id: "123" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove hardcoded user information.
Using hardcoded user information is a security risk and could lead to confusion in production.
Add proper user information handling:
- user: {
- id: this.doc.clientID.toString(),
- name: "",
- email: "",
- tenant_id: "123"
- }
+ user: this.userInfo
Add user information to the constructor:
constructor(
doc: Y.Doc,
private userInfo: {
id: string;
tenant_id?: string;
name: string;
email: string;
}
) {
this.doc = doc;
// ... rest of the constructor
}
protected accessTokenProvider: AccessTokenProvider | undefined; | ||
protected projectId: string | undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make required fields non-optional or add proper null checks.
The accessTokenProvider
and projectId
fields are marked as optional but are used throughout the code without proper null checks. This could lead to runtime errors.
Apply this diff:
- protected accessTokenProvider: AccessTokenProvider | undefined;
- protected projectId: string | undefined;
+ private accessTokenProvider!: AccessTokenProvider;
+ private projectId!: string;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
protected accessTokenProvider: AccessTokenProvider | undefined; | |
protected projectId: string | undefined; | |
private accessTokenProvider!: AccessTokenProvider; | |
private projectId!: string; |
Overview
There's a couple of reason why we had to choose this custom-built to work with our specific Rust backend architecture.
Integrates with our session management system
It has
What I've done
I've added
socketYjsManager
to handle event based connection with the websocket backend.What I haven't done
I haven't implemented Awareness on this implementation and There's a few more Event from the backend that hasn't been added just cause they aren't needed immediately.
How I tested
Once this PR is merged, you can run the websocket server in your local and try to play with collab editing.
Screenshot
Which point I want you to review particularly
socketYjsManager.ts
Memo
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Type Definitions
User
type with a new optionaltenant_id
property for more detailed user information.