From 981c67db7d2ba5871e353442e3879cb32de891f8 Mon Sep 17 00:00:00 2001 From: pyshx Date: Sun, 24 Nov 2024 05:26:38 +0530 Subject: [PATCH 01/17] fix(ui): socket yjs manager & refactor --- ui/src/lib/utils.ts | 4 + ui/src/lib/yjs/socketYjsManager.ts | 301 +++++++++++++++++++++++++++++ ui/src/lib/yjs/types.ts | 23 +++ ui/src/lib/yjs/useYjsStore.ts | 172 ++++++++++------- ui/src/types/user.ts | 1 + 5 files changed, 434 insertions(+), 67 deletions(-) create mode 100644 ui/src/lib/yjs/socketYjsManager.ts diff --git a/ui/src/lib/utils.ts b/ui/src/lib/utils.ts index cffaf97a5..60aace9d9 100644 --- a/ui/src/lib/utils.ts +++ b/ui/src/lib/utils.ts @@ -5,3 +5,7 @@ import { twMerge } from "tailwind-merge"; export function cn(...inputs: ClassValue[]) { return twMerge(clsx(inputs)); } + +export function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts new file mode 100644 index 000000000..6f69908bf --- /dev/null +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -0,0 +1,301 @@ +import * as Y from 'yjs'; + +import { sleep } from '../utils'; + +import type { FlowMessage } from './types'; + +export type AccessTokenProvider = () => Promise | string; + +export class SocketYjsManager { + protected ws!: WebSocket; + protected doc: Y.Doc; + protected socketReady = false; + protected firstSyncComplete = false; + protected accessTokenProvider: AccessTokenProvider | undefined; + protected projectId: string | undefined; + protected onUpdateHandlers: ((update: Uint8Array) => void)[] = []; + protected reconnectAttempts = 0; + protected maxReconnectAttempts = 5; + protected reconnectDelay = 1000; + protected reconnectTimer: ReturnType | null = null; + + constructor(doc: Y.Doc) { + this.doc = doc; + + // Bind methods + this.onConnectionEstablished = this.onConnectionEstablished.bind(this); + this.onConnectionDisconnect = this.onConnectionDisconnect.bind(this); + this.onConnectionError = this.onConnectionError.bind(this); + this.onAuthenticateRequest = this.onAuthenticateRequest.bind(this); + this.onDocUpdate = this.onDocUpdate.bind(this); + this.onReady = this.onReady.bind(this); + this.onPeerUpdate = this.onPeerUpdate.bind(this); + this.handleMessage = this.handleMessage.bind(this); + this.reconnect = this.reconnect.bind(this); + } + + public getDoc(): Y.Doc { + return this.doc; + } + + // Replace the setupSocket method + async setupSocket(data: { + url: string; + roomId: string; + projectId: string; + accessTokenProvider: AccessTokenProvider; + }) { + this.accessTokenProvider = data.accessTokenProvider; + this.projectId = data.projectId; + + try { + const token = await this.accessTokenProvider(); + const wsUrl = new URL(data.url); + wsUrl.protocol = wsUrl.protocol.replace('http', 'ws'); + wsUrl.pathname = `/${data.roomId}`; + + // Add query parameters for authentication + wsUrl.searchParams.set('user_id', this.doc.clientID.toString()); + wsUrl.searchParams.set('project_id', data.projectId); + wsUrl.searchParams.set('token', token); // Pass token as query param since we can't set headers + + this.ws = new WebSocket(wsUrl.href); + this.ws.binaryType = 'arraybuffer'; + + this.setupWebSocketListeners(); + this.setupDocListeners(); + + console.log('Attempting WebSocket connection to:', wsUrl.href); + } catch (error) { + console.error('Failed to setup WebSocket:', error); + throw error; + } + } + + // Update the reconnect method + private reconnect() { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + } + + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + this.reconnectTimer = setTimeout(async () => { + try { + if (this.ws) { + const originalUrl = new URL(this.ws.url); + const baseUrl = `${originalUrl.protocol}//${originalUrl.host}`; + const roomId = originalUrl.pathname.slice(1); + await this.setupSocket({ + url: baseUrl, + roomId, + projectId: this.projectId || '', + accessTokenProvider: this.accessTokenProvider || (() => ''), + }); + } + } catch (error) { + console.error('Reconnection failed:', error); + } + }, delay); + } + } + + private setupWebSocketListeners() { + this.ws.addEventListener('open', this.onConnectionEstablished); + this.ws.addEventListener('close', this.onConnectionDisconnect); + this.ws.addEventListener('error', this.onConnectionError); + this.ws.addEventListener('message', this.handleMessage); + } + + private setupDocListeners() { + this.doc.on('update', this.onDocUpdate); + } + + protected onConnectionEstablished() { + this.reconnectAttempts = 0; + this.socketReady = true; + this.initializeRoom().catch(console.error); + } + + protected onConnectionDisconnect() { + this.socketReady = false; + this.firstSyncComplete = false; + this.reconnect(); + } + + protected onConnectionError(error: Event) { + console.error('WebSocket error:', error); + this.reconnect(); + } + + protected async onAuthenticateRequest() { + const token = await this.accessTokenProvider?.(); + if (token && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: 'authenticate', token })); + } + } + + protected async handleMessage(event: MessageEvent) { + try { + if (event.data instanceof ArrayBuffer) { + // Handle binary message (Yjs update) + const update = new Uint8Array(event.data); + await this.onPeerUpdate({ update }); + } else if (typeof event.data === 'string') { + // Handle text message + const data = JSON.parse(event.data); + if (data.type === 'authenticate') { + await this.onAuthenticateRequest(); + } else if (data.type === 'ready') { + this.onReady(); + } + } + } catch (error) { + console.error('Error handling message:', error); + } + } + + protected async initializeRoom() { + try { + await this.sendFlowMessage({ + event: { + tag: 'Create', + content: { room_id: this.doc.clientID.toString() } + } + }); + + await this.sendFlowMessage({ + event: { + tag: 'Join', + content: { room_id: this.doc.clientID.toString() } + } + }); + + await this.sendFlowMessage({ + event: { + tag: 'Emit', + content: { data: '' } + }, + session_command: { + tag: 'Start', + content: { + project_id: this.projectId || '', + user: { + id: this.doc.clientID.toString(), + tenant_id: this.projectId, + name: 'defaultName', + email: 'defaultEmail@example.com' + } + } + } + }); + + await this.syncData(); + } catch (error) { + console.error('Failed to initialize room:', error); + } + } + + async isReady(): Promise { + if (this.socketReady) return true; + await sleep(100); + return await this.isReady(); + } + + protected onReady() { + this.socketReady = true; + } + + protected async onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) { + const update = data.update instanceof ArrayBuffer + ? new Uint8Array(data.update) + : data.update; + Y.applyUpdate(this.doc, update, 'peer'); + this.onUpdateHandlers.forEach(handler => handler(update)); + } + + async syncData() { + await this.isReady(); + + const stateVector = Y.encodeStateVector(this.doc); + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(stateVector); + } + + if (!this.firstSyncComplete) { + this.firstSyncComplete = true; + queueMicrotask(() => this.syncData()); + } + } + + private async sendFlowMessage(message: FlowMessage): Promise { + return new Promise((resolve, reject) => { + if (this.ws.readyState !== WebSocket.OPEN) { + reject(new Error('WebSocket is not connected')); + return; + } + + try { + this.ws.send(JSON.stringify(message)); + resolve(); + } catch (error) { + reject(error); + } + }); + } + + protected onDocUpdate(update: Uint8Array, origin: unknown) { + if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { + this.sendFlowMessage({ + event: { + tag: 'Emit', + content: { data: '' } + }, + session_command: { + tag: 'MergeUpdates', + content: { + project_id: this.projectId || '', + data: new Uint8Array(update), + updated_by: this.doc.clientID.toString() + } + } + }); + } + } + + onUpdate(handler: (update: Uint8Array) => void) { + this.onUpdateHandlers.push(handler); + } + + destroy() { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + } + + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.sendFlowMessage({ + event: { + tag: 'Emit', + content: { data: '' } + }, + session_command: { + tag: 'End', + content: { + project_id: this.projectId || '', + user: { + id: this.doc.clientID.toString(), + tenant_id: this.projectId, + name: 'defaultName', + email: 'defaultEmail@example.com' + } + } + } + }).finally(() => { + this.ws.close(); + }); + } + this.doc.destroy(); + } +} diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index 710889a6c..fe0c8ed6b 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -1,3 +1,5 @@ +import { User } from "@flow/types/user"; + export type YJsonPrimitive = string | number | boolean | null | Uint8Array; export type YJsonValue = @@ -6,3 +8,24 @@ export type YJsonValue = | { [key: string]: YJsonValue; }; + +export type FlowMessage = { + event: { + tag: 'Create' | 'Join' | 'Leave' | 'Emit'; + content: { + room_id?: string; + data?: string; + }; + }; + session_command?: SessionCommand; +} + +export type SessionCommand = { + tag: 'Start' | 'End' | 'Complete' | 'CheckStatus' | 'AddTask' | 'RemoveTask' | 'ListAllSnapshotsVersions' | 'MergeUpdates'; + content: { + project_id: string; + user?: User; + data?: Uint8Array; + updated_by?: string; + }; +}; diff --git a/ui/src/lib/yjs/useYjsStore.ts b/ui/src/lib/yjs/useYjsStore.ts index 81f61acdc..346301d8d 100644 --- a/ui/src/lib/yjs/useYjsStore.ts +++ b/ui/src/lib/yjs/useYjsStore.ts @@ -1,6 +1,5 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { useY } from "react-yjs"; -import { WebsocketProvider } from "y-websocket"; import * as Y from "yjs"; import { config } from "@flow/config"; @@ -13,6 +12,7 @@ import { createWorkflowsYaml } from "@flow/utils/engineWorkflowYaml/workflowYaml import { useDeployment } from "../gql/deployment"; import { useT } from "../i18n"; +import { SocketYjsManager } from "./socketYjsManager"; import useWorkflowTabs from "./useWorkflowTabs"; import useYEdge from "./useYEdge"; import useYNode from "./useYNode"; @@ -28,67 +28,77 @@ export default ({ }) => { const { toast } = useToast(); const t = useT(); - const [currentProject] = useCurrentProject(); const { createDeployment, useUpdateDeployment } = useDeployment(); - - const yWebSocketRef = useRef(null); - useEffect(() => () => yWebSocketRef.current?.destroy(), []); - + const managerRef = useRef(null); const [undoManager, setUndoManager] = useState(null); - const [{ yWorkflows, currentUserClientId, undoTrackerActionWrapper }] = - useState(() => { - const yDoc = new Y.Doc(); - const { websocket, websocketToken } = config(); - if (workflowId && websocket && websocketToken) { - yWebSocketRef.current = new WebsocketProvider( - websocket, - workflowId, - yDoc, - { params: { token: websocketToken } }, - ); - } + // Initialize store and connect + const [{ yWorkflows, currentUserClientId }] = useState(() => { + const doc = new Y.Doc(); + const yWorkflows = doc.getArray("workflows"); - const yWorkflows = yDoc.getArray("workflows"); + // Initialize with default workflow if empty + if (yWorkflows.length === 0) { const yWorkflow = yWorkflowBuilder("main", "Main Workflow"); yWorkflows.push([yWorkflow]); + } - const currentUserClientId = yDoc.clientID; - - // NOTE: any changes to the yDoc should be wrapped in a transact - const undoTrackerActionWrapper = (callback: () => void) => - yDoc.transact(callback, currentUserClientId); - - return { yWorkflows, currentUserClientId, undoTrackerActionWrapper }; - }); + return { + yWorkflows, + currentUserClientId: doc.clientID, + }; + }); useEffect(() => { - if (yWorkflows) { - const manager = new Y.UndoManager(yWorkflows, { - trackedOrigins: new Set([currentUserClientId]), // Only track local changes - captureTimeout: 200, // default is 500. 200ms is a good balance between performance and user experience + const { websocket: websocketUrl, websocketToken } = config(); + if (!websocketUrl || !websocketToken || !workflowId || !currentProject?.id) + return; + + const doc = managerRef.current?.getDoc() || yWorkflows.doc; + + // Create and setup socket manager + if (!doc) return; + const manager = new SocketYjsManager(doc); + manager + .setupSocket({ + url: websocketUrl, + roomId: workflowId, + projectId: currentProject.id, + accessTokenProvider: async () => websocketToken, + }) + .catch((error) => { + toast({ + title: t("Connection Error"), + description: error.message, + variant: "destructive", + }); }); - setUndoManager(manager); - - return () => { - manager.destroy(); // Clean up UndoManager on component unmount - }; - } - }, [yWorkflows, currentUserClientId]); - - const handleWorkflowUndo = useCallback(() => { - if (undoManager?.undoStack && undoManager.undoStack.length > 0) { - undoManager?.undo(); - } - }, [undoManager]); - - const handleWorkflowRedo = useCallback(() => { - if (undoManager?.redoStack && undoManager.redoStack.length > 0) { - undoManager?.redo(); - } - }, [undoManager]); + // Setup undo manager + const undoMngr = new Y.UndoManager(yWorkflows, { + trackedOrigins: new Set([currentUserClientId]), + captureTimeout: 200, + }); + setUndoManager(undoMngr); + + managerRef.current = manager; + + return () => { + manager.destroy(); + undoMngr.destroy(); + managerRef.current = null; + }; + }, [ + workflowId, + currentProject?.id, + toast, + t, + currentUserClientId, + yWorkflows, + ]); + + // Get the raw workflows using useY const rawWorkflows = useY(yWorkflows); const { @@ -99,7 +109,21 @@ export default ({ setOpenWorkflowIds, handleWorkflowOpen, handleWorkflowClose, - } = useWorkflowTabs({ workflowId, rawWorkflows, handleWorkflowIdChange }); + } = useWorkflowTabs({ + workflowId, + rawWorkflows: rawWorkflows as Record[], + handleWorkflowIdChange, + }); + + // Create wrapper for undoTrackerActionWrapper + const undoTrackerActionWrapper = useCallback( + (callback: () => void) => { + const doc = managerRef.current?.getDoc(); + if (!doc) return; + doc.transact(callback, currentUserClientId); + }, + [currentUserClientId], + ); const { currentYWorkflow, handleWorkflowAdd, handleWorkflowsRemove } = useYWorkflow({ @@ -120,6 +144,29 @@ export default ({ currentYWorkflow?.get("edges") ?? new Y.Array(), ) as Edge[]; + const { handleNodesUpdate } = useYNode({ + currentYWorkflow, + undoTrackerActionWrapper, + handleWorkflowsRemove, + }); + + const { handleEdgesUpdate } = useYEdge({ + currentYWorkflow, + undoTrackerActionWrapper, + }); + + const handleWorkflowUndo = useCallback(() => { + if (undoManager?.undoStack.length) { + undoManager.undo(); + } + }, [undoManager]); + + const handleWorkflowRedo = useCallback(() => { + if (undoManager?.redoStack.length) { + undoManager.redo(); + } + }, [undoManager]); + const handleWorkflowDeployment = useCallback( async (deploymentId?: string, description?: string) => { const { @@ -135,12 +182,13 @@ export default ({ projectName, rawWorkflows .map((w): Workflow | undefined => { - if (!w || w.nodes.length < 1) return undefined; - const id = w.id as string; - const name = w.name as string; - const n = w.nodes as Node[]; - const e = w.edges as Edge[]; - return { id, name, nodes: n, edges: e }; + if (!w || (w.nodes as Node[]).length < 1) return undefined; + return { + id: w.id as string, + name: w.name as string, + nodes: w.nodes as Node[], + edges: w.edges as Edge[], + }; }) .filter(isDefined), ) ?? {}; @@ -180,16 +228,6 @@ export default ({ ], ); - const { handleNodesUpdate } = useYNode({ - currentYWorkflow, - undoTrackerActionWrapper, - handleWorkflowsRemove, - }); - - const { handleEdgesUpdate } = useYEdge({ - currentYWorkflow, - undoTrackerActionWrapper, - }); return { nodes, diff --git a/ui/src/types/user.ts b/ui/src/types/user.ts index a457e3864..f4d20fa99 100644 --- a/ui/src/types/user.ts +++ b/ui/src/types/user.ts @@ -15,6 +15,7 @@ export type User = { id: string; name: string; email: string; + tenant_id?: string; }; export type SearchUser = { From c1cc87199c3624edb04cb551dfe68bec73ae31ee Mon Sep 17 00:00:00 2001 From: pyshx Date: Sun, 24 Nov 2024 05:31:17 +0530 Subject: [PATCH 02/17] prettier --- ui/src/lib/utils.ts | 2 +- ui/src/lib/yjs/socketYjsManager.ts | 134 +++++++++++++++-------------- ui/src/lib/yjs/types.ts | 14 ++- ui/src/lib/yjs/useYjsStore.ts | 1 - 4 files changed, 80 insertions(+), 71 deletions(-) diff --git a/ui/src/lib/utils.ts b/ui/src/lib/utils.ts index 60aace9d9..2885a4968 100644 --- a/ui/src/lib/utils.ts +++ b/ui/src/lib/utils.ts @@ -7,5 +7,5 @@ export function cn(...inputs: ClassValue[]) { } export function sleep(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); + return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 6f69908bf..4ace9a7c1 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -1,8 +1,8 @@ -import * as Y from 'yjs'; +import * as Y from "yjs"; -import { sleep } from '../utils'; +import { sleep } from "../utils"; -import type { FlowMessage } from './types'; +import type { FlowMessage } from "./types"; export type AccessTokenProvider = () => Promise | string; @@ -51,23 +51,23 @@ export class SocketYjsManager { try { const token = await this.accessTokenProvider(); const wsUrl = new URL(data.url); - wsUrl.protocol = wsUrl.protocol.replace('http', 'ws'); + wsUrl.protocol = wsUrl.protocol.replace("http", "ws"); wsUrl.pathname = `/${data.roomId}`; - + // Add query parameters for authentication - wsUrl.searchParams.set('user_id', this.doc.clientID.toString()); - wsUrl.searchParams.set('project_id', data.projectId); - wsUrl.searchParams.set('token', token); // Pass token as query param since we can't set headers + wsUrl.searchParams.set("user_id", this.doc.clientID.toString()); + wsUrl.searchParams.set("project_id", data.projectId); + wsUrl.searchParams.set("token", token); // Pass token as query param since we can't set headers this.ws = new WebSocket(wsUrl.href); - this.ws.binaryType = 'arraybuffer'; - + this.ws.binaryType = "arraybuffer"; + this.setupWebSocketListeners(); this.setupDocListeners(); - console.log('Attempting WebSocket connection to:', wsUrl.href); + console.log("Attempting WebSocket connection to:", wsUrl.href); } catch (error) { - console.error('Failed to setup WebSocket:', error); + console.error("Failed to setup WebSocket:", error); throw error; } } @@ -80,8 +80,9 @@ export class SocketYjsManager { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; - const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); - + const delay = + this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + this.reconnectTimer = setTimeout(async () => { try { if (this.ws) { @@ -91,26 +92,26 @@ export class SocketYjsManager { await this.setupSocket({ url: baseUrl, roomId, - projectId: this.projectId || '', - accessTokenProvider: this.accessTokenProvider || (() => ''), + projectId: this.projectId || "", + accessTokenProvider: this.accessTokenProvider || (() => ""), }); } } catch (error) { - console.error('Reconnection failed:', error); + console.error("Reconnection failed:", error); } }, delay); } } private setupWebSocketListeners() { - this.ws.addEventListener('open', this.onConnectionEstablished); - this.ws.addEventListener('close', this.onConnectionDisconnect); - this.ws.addEventListener('error', this.onConnectionError); - this.ws.addEventListener('message', this.handleMessage); + this.ws.addEventListener("open", this.onConnectionEstablished); + this.ws.addEventListener("close", this.onConnectionDisconnect); + this.ws.addEventListener("error", this.onConnectionError); + this.ws.addEventListener("message", this.handleMessage); } private setupDocListeners() { - this.doc.on('update', this.onDocUpdate); + this.doc.on("update", this.onDocUpdate); } protected onConnectionEstablished() { @@ -126,14 +127,14 @@ export class SocketYjsManager { } protected onConnectionError(error: Event) { - console.error('WebSocket error:', error); + console.error("WebSocket error:", error); this.reconnect(); } protected async onAuthenticateRequest() { const token = await this.accessTokenProvider?.(); if (token && this.ws.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify({ type: 'authenticate', token })); + this.ws.send(JSON.stringify({ type: "authenticate", token })); } } @@ -143,17 +144,17 @@ export class SocketYjsManager { // Handle binary message (Yjs update) const update = new Uint8Array(event.data); await this.onPeerUpdate({ update }); - } else if (typeof event.data === 'string') { + } else if (typeof event.data === "string") { // Handle text message const data = JSON.parse(event.data); - if (data.type === 'authenticate') { + if (data.type === "authenticate") { await this.onAuthenticateRequest(); - } else if (data.type === 'ready') { + } else if (data.type === "ready") { this.onReady(); } } } catch (error) { - console.error('Error handling message:', error); + console.error("Error handling message:", error); } } @@ -161,40 +162,40 @@ export class SocketYjsManager { try { await this.sendFlowMessage({ event: { - tag: 'Create', - content: { room_id: this.doc.clientID.toString() } - } + tag: "Create", + content: { room_id: this.doc.clientID.toString() }, + }, }); await this.sendFlowMessage({ event: { - tag: 'Join', - content: { room_id: this.doc.clientID.toString() } - } + tag: "Join", + content: { room_id: this.doc.clientID.toString() }, + }, }); await this.sendFlowMessage({ event: { - tag: 'Emit', - content: { data: '' } + tag: "Emit", + content: { data: "" }, }, session_command: { - tag: 'Start', + tag: "Start", content: { - project_id: this.projectId || '', + project_id: this.projectId || "", user: { id: this.doc.clientID.toString(), tenant_id: this.projectId, - name: 'defaultName', - email: 'defaultEmail@example.com' - } - } - } + name: "defaultName", + email: "defaultEmail@example.com", + }, + }, + }, }); await this.syncData(); } catch (error) { - console.error('Failed to initialize room:', error); + console.error("Failed to initialize room:", error); } } @@ -209,11 +210,12 @@ export class SocketYjsManager { } protected async onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) { - const update = data.update instanceof ArrayBuffer - ? new Uint8Array(data.update) - : data.update; - Y.applyUpdate(this.doc, update, 'peer'); - this.onUpdateHandlers.forEach(handler => handler(update)); + const update = + data.update instanceof ArrayBuffer + ? new Uint8Array(data.update) + : data.update; + Y.applyUpdate(this.doc, update, "peer"); + this.onUpdateHandlers.forEach((handler) => handler(update)); } async syncData() { @@ -233,7 +235,7 @@ export class SocketYjsManager { private async sendFlowMessage(message: FlowMessage): Promise { return new Promise((resolve, reject) => { if (this.ws.readyState !== WebSocket.OPEN) { - reject(new Error('WebSocket is not connected')); + reject(new Error("WebSocket is not connected")); return; } @@ -250,17 +252,17 @@ export class SocketYjsManager { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { this.sendFlowMessage({ event: { - tag: 'Emit', - content: { data: '' } + tag: "Emit", + content: { data: "" }, }, session_command: { - tag: 'MergeUpdates', + tag: "MergeUpdates", content: { - project_id: this.projectId || '', + project_id: this.projectId || "", data: new Uint8Array(update), - updated_by: this.doc.clientID.toString() - } - } + updated_by: this.doc.clientID.toString(), + }, + }, }); } } @@ -277,21 +279,21 @@ export class SocketYjsManager { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.sendFlowMessage({ event: { - tag: 'Emit', - content: { data: '' } + tag: "Emit", + content: { data: "" }, }, session_command: { - tag: 'End', + tag: "End", content: { - project_id: this.projectId || '', + project_id: this.projectId || "", user: { id: this.doc.clientID.toString(), tenant_id: this.projectId, - name: 'defaultName', - email: 'defaultEmail@example.com' - } - } - } + name: "defaultName", + email: "defaultEmail@example.com", + }, + }, + }, }).finally(() => { this.ws.close(); }); diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index fe0c8ed6b..e4a1b85e6 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -11,17 +11,25 @@ export type YJsonValue = export type FlowMessage = { event: { - tag: 'Create' | 'Join' | 'Leave' | 'Emit'; + tag: "Create" | "Join" | "Leave" | "Emit"; content: { room_id?: string; data?: string; }; }; session_command?: SessionCommand; -} +}; export type SessionCommand = { - tag: 'Start' | 'End' | 'Complete' | 'CheckStatus' | 'AddTask' | 'RemoveTask' | 'ListAllSnapshotsVersions' | 'MergeUpdates'; + tag: + | "Start" + | "End" + | "Complete" + | "CheckStatus" + | "AddTask" + | "RemoveTask" + | "ListAllSnapshotsVersions" + | "MergeUpdates"; content: { project_id: string; user?: User; diff --git a/ui/src/lib/yjs/useYjsStore.ts b/ui/src/lib/yjs/useYjsStore.ts index 346301d8d..afeb8a6c8 100644 --- a/ui/src/lib/yjs/useYjsStore.ts +++ b/ui/src/lib/yjs/useYjsStore.ts @@ -228,7 +228,6 @@ export default ({ ], ); - return { nodes, edges, From e855eb49506914829b67762a4b30bc4ebee4a43d Mon Sep 17 00:00:00 2001 From: pyshx Date: Sun, 24 Nov 2024 05:34:57 +0530 Subject: [PATCH 03/17] i28n --- ui/src/lib/i18n/locales/en.json | 1 + ui/src/lib/i18n/locales/es.json | 1 + ui/src/lib/i18n/locales/fr.json | 1 + ui/src/lib/i18n/locales/ja.json | 1 + ui/src/lib/i18n/locales/zh.json | 1 + 5 files changed, 5 insertions(+) diff --git a/ui/src/lib/i18n/locales/en.json b/ui/src/lib/i18n/locales/en.json index 9546b79cd..6a0d5c006 100644 --- a/ui/src/lib/i18n/locales/en.json +++ b/ui/src/lib/i18n/locales/en.json @@ -225,6 +225,7 @@ "Member has been successfully added to the workspace.": "", "Member Removed": "", "Member has been successfully removed from the workspace.": "", + "Connection Error": "", "Empty workflow detected": "", "You cannot create a deployment without a workflow.": "", "Reload": "" diff --git a/ui/src/lib/i18n/locales/es.json b/ui/src/lib/i18n/locales/es.json index 74268c6dc..d5f54dfd8 100644 --- a/ui/src/lib/i18n/locales/es.json +++ b/ui/src/lib/i18n/locales/es.json @@ -225,6 +225,7 @@ "Member has been successfully added to the workspace.": "El miembro ha sido agregado exitosamente al espacio de trabajo.", "Member Removed": "Miembro eliminado", "Member has been successfully removed from the workspace.": "El miembro ha sido eliminado exitosamente del espacio de trabajo.", + "Connection Error": "", "Empty workflow detected": "Se detectó un flujo de trabajo vacío", "You cannot create a deployment without a workflow.": "No puedes crear un despliegue sin un flujo de trabajo.", "Reload": "Recargar" diff --git a/ui/src/lib/i18n/locales/fr.json b/ui/src/lib/i18n/locales/fr.json index c0cc73a14..091c39a14 100644 --- a/ui/src/lib/i18n/locales/fr.json +++ b/ui/src/lib/i18n/locales/fr.json @@ -225,6 +225,7 @@ "Member has been successfully added to the workspace.": "Le membre a été ajouté avec succès à l'espace de travail.", "Member Removed": "Membre supprimé", "Member has been successfully removed from the workspace.": "Le membre a été supprimé avec succès de l'espace de travail.", + "Connection Error": "", "Empty workflow detected": "Flux de travail vide détecté", "You cannot create a deployment without a workflow.": "Vous ne pouvez pas créer de déploiement sans flux de travail.", "Reload": "Recharger" diff --git a/ui/src/lib/i18n/locales/ja.json b/ui/src/lib/i18n/locales/ja.json index f30f1443e..fb3c63946 100644 --- a/ui/src/lib/i18n/locales/ja.json +++ b/ui/src/lib/i18n/locales/ja.json @@ -225,6 +225,7 @@ "Member has been successfully added to the workspace.": "メンバーがワークスペースに正常に追加されました。", "Member Removed": "メンバー削除", "Member has been successfully removed from the workspace.": "メンバーがワークスペースから正常に削除されました。", + "Connection Error": "", "Empty workflow detected": "空のワークフローが検出されました", "You cannot create a deployment without a workflow.": "ワークフローがないとデプロイメントを作成できません。", "Reload": "リロード" diff --git a/ui/src/lib/i18n/locales/zh.json b/ui/src/lib/i18n/locales/zh.json index cdab61366..2a86455f8 100644 --- a/ui/src/lib/i18n/locales/zh.json +++ b/ui/src/lib/i18n/locales/zh.json @@ -225,6 +225,7 @@ "Member has been successfully added to the workspace.": "成员已成功添加到工作区。", "Member Removed": "成员已移除", "Member has been successfully removed from the workspace.": "成员已成功从工作区中移除。", + "Connection Error": "", "Empty workflow detected": "检测到空工作流", "You cannot create a deployment without a workflow.": "没有工作流,您无法创建部署。", "Reload": "重新加载" From a8cbafeb66e40aabcfe8b6f483bcf9e490648795 Mon Sep 17 00:00:00 2001 From: pyshx Date: Sun, 24 Nov 2024 12:51:18 +0530 Subject: [PATCH 04/17] update comment --- ui/src/lib/yjs/socketYjsManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 4ace9a7c1..5bb020628 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -65,7 +65,7 @@ export class SocketYjsManager { this.setupWebSocketListeners(); this.setupDocListeners(); - console.log("Attempting WebSocket connection to:", wsUrl.href); + console.log("Attempting WebSocket connection to:", wsUrl.origin + wsUrl.pathname); } catch (error) { console.error("Failed to setup WebSocket:", error); throw error; From 7a606b0be93b1ea99fc9e65e2d3ea93d23af4824 Mon Sep 17 00:00:00 2001 From: pyshx Date: Mon, 25 Nov 2024 02:40:30 +0530 Subject: [PATCH 05/17] use v2 method on FE and debug comments on backend --- ui/src/lib/yjs/socketYjsManager.ts | 30 +-- websocket/app/examples/session_client.rs | 2 +- .../infra/src/persistence/redis/updates.rs | 119 +++++++++-- .../src/manage_project_edit_session.rs | 199 ++++++++++++------ 4 files changed, 245 insertions(+), 105 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 5bb020628..cb419a034 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -209,19 +209,22 @@ export class SocketYjsManager { this.socketReady = true; } - protected async onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) { - const update = - data.update instanceof ArrayBuffer - ? new Uint8Array(data.update) - : data.update; - Y.applyUpdate(this.doc, update, "peer"); + protected onPeerUpdate(data: { update: ArrayBuffer | Uint8Array }) { + const update = data.update instanceof ArrayBuffer + ? new Uint8Array(data.update) + : data.update; + + const currentState = Y.encodeStateAsUpdateV2(this.doc); + const diffUpdate = Y.diffUpdateV2(update, currentState); + Y.applyUpdateV2(this.doc, diffUpdate, 'peer'); this.onUpdateHandlers.forEach((handler) => handler(update)); } async syncData() { await this.isReady(); - const stateVector = Y.encodeStateVector(this.doc); + const currentState = Y.encodeStateAsUpdateV2(this.doc); + const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(stateVector); } @@ -250,19 +253,22 @@ export class SocketYjsManager { protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { + const stateVector = Y.encodeStateVectorFromUpdateV2(update); + const diffUpdate = Y.diffUpdateV2(update, stateVector); + this.sendFlowMessage({ event: { tag: "Emit", - content: { data: "" }, + content: { data: "" } }, session_command: { tag: "MergeUpdates", content: { project_id: this.projectId || "", - data: new Uint8Array(update), - updated_by: this.doc.clientID.toString(), - }, - }, + data: new Uint8Array(diffUpdate), + updated_by: this.doc.clientID.toString() + } + } }); } } diff --git a/websocket/app/examples/session_client.rs b/websocket/app/examples/session_client.rs index 3eed1dd4a..e32485d64 100644 --- a/websocket/app/examples/session_client.rs +++ b/websocket/app/examples/session_client.rs @@ -71,7 +71,7 @@ async fn main() -> Result<(), Box> { let room_id = "room123"; let url = Url::parse(&format!( - "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&project_id={project_id}", + "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&project_id={project_id}&token=nyaan", room_id = room_id, user_id = user_id, project_id = project_id diff --git a/websocket/crates/infra/src/persistence/redis/updates.rs b/websocket/crates/infra/src/persistence/redis/updates.rs index 02bac1870..b1fdc9426 100644 --- a/websocket/crates/infra/src/persistence/redis/updates.rs +++ b/websocket/crates/infra/src/persistence/redis/updates.rs @@ -101,35 +101,110 @@ impl UpdateManager { redis_data: Option>, new_update_by: Option, ) -> Result<(Vec, Vec), FlowProjectRedisDataManagerError> { - let (stream_update, stream_updates_by) = - (self.get_merged_update_from_stream(project_id).await?).unwrap_or_default(); - + debug!( + "Starting merge_updates_internal for project_id: {}, has_redis_data: {}, new_update_by: {:?}", + project_id, + redis_data.is_some(), + new_update_by + ); + + let (stream_update, stream_updates_by) = match self.get_merged_update_from_stream(project_id).await { + Ok(Some((update, updates_by))) => { + debug!( + "Retrieved stream update for project {}: size={}, updates_by={:?}", + project_id, + update.len(), + updates_by + ); + (update, updates_by) + } + Ok(None) => { + debug!("No existing stream updates found for project {}", project_id); + Default::default() + } + Err(e) => { + debug!( + "Error getting merged update from stream for project {}: {:?}", + project_id, e + ); + return Err(e); + } + }; + let redis_update = redis_data.unwrap_or_default(); - - let optimized_merged_state = tokio::task::spawn_blocking( - move || -> Result, FlowProjectRedisDataManagerError> { - let doc = Doc::new(); - let mut txn = doc.transact_mut(); - - if !redis_update.is_empty() { - txn.apply_update(Update::decode_v2(&redis_update)?)?; + debug!( + "Redis update size for project {}: {}", + project_id, + redis_update.len() + ); + + debug!( + "Spawning blocking task to merge updates for project {}", + project_id + ); + let optimized_merged_state = tokio::task::spawn_blocking(move || { + debug!("Starting merge operation in blocking task"); + let doc = Doc::new(); + let mut txn = doc.transact_mut(); + + if !redis_update.is_empty() { + debug!("Applying redis update of size {}", redis_update.len()); + match Update::decode_v2(&redis_update) { + Ok(update) => { + if let Err(e) = txn.apply_update(update) { + debug!("Error applying redis update: {:?}", e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } + } + Err(e) => { + debug!("Error decoding redis update: {:?}", e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } } - - if !stream_update.is_empty() { - txn.apply_update(Update::decode_v2(&stream_update)?)?; + } + + if !stream_update.is_empty() { + debug!("Applying stream update of size {}", stream_update.len()); + match Update::decode_v2(&stream_update) { + Ok(update) => { + if let Err(e) = txn.apply_update(update) { + debug!("Error applying stream update: {:?}", e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } + } + Err(e) => { + debug!("Error decoding stream update: {:?}", e); + return Err(FlowProjectRedisDataManagerError::from(e)); + } } - - Ok(txn.encode_update_v2()) - }, - ) - .await??; - + } + + let result = txn.encode_update_v2(); + debug!("Successfully encoded merged update of size {}", result.len()); + Ok(result) + }) + .await + .map_err(|e| { + debug!("Join error from blocking task: {:?}", e); + FlowProjectRedisDataManagerError::from(e) + })??; + let mut updates_by = stream_updates_by; if let Some(new_update_by) = new_update_by { + debug!( + "Adding new update attribution for project {}: {}", + project_id, new_update_by + ); updates_by.push(new_update_by); } - - debug!("Final merged state: {:?}", optimized_merged_state); + + debug!( + "Completed merge_updates_internal for project {}: final_size={}, updates_by={:?}", + project_id, + optimized_merged_state.len(), + updates_by + ); + Ok((optimized_merged_state, updates_by)) } } diff --git a/websocket/crates/services/src/manage_project_edit_session.rs b/websocket/crates/services/src/manage_project_edit_session.rs index 63fbdb811..66e0634db 100644 --- a/websocket/crates/services/src/manage_project_edit_session.rs +++ b/websocket/crates/services/src/manage_project_edit_session.rs @@ -148,87 +148,146 @@ where &self, result: Result, ) -> Result<(), ProjectServiceError> { + debug!("Handling command: {:?}", result); + match result { - Ok(command) => match command { - SessionCommand::Start { project_id, user } => { - self.handle_session_start(&project_id, user).await?; - } - SessionCommand::End { project_id, user } => { - self.handle_session_end(&project_id, user).await?; - } - SessionCommand::Complete { project_id, user } => { - if let Some(mut session) = self.get_latest_session(&project_id).await? { - self.complete_job_if_met_requirements(&mut session).await?; + Ok(command) => { + debug!("Processing command: {:?}", command); + match command { + SessionCommand::Start { project_id, user } => { + debug!("Starting session for project: {}, user: {:?}", project_id, user); + let result = self.handle_session_start(&project_id, user).await; + debug!("Session start result: {:?}", result); + result?; + } + SessionCommand::End { project_id, user } => { + debug!("Ending session for project: {}, user: {:?}", project_id, user); + let result = self.handle_session_end(&project_id, user).await; + debug!("Session end result: {:?}", result); + result?; + } + SessionCommand::Complete { project_id, user } => { + debug!("Completing session for project: {}, user: {:?}", project_id, user); + if let Some(mut session) = self.get_latest_session(&project_id).await? { + debug!("Found latest session: {:?}", session); + let result = self.complete_job_if_met_requirements(&mut session).await; + debug!("Job completion result: {:?}", result); + result?; + debug!( + "Job completed by user: {} for project: {}", + user.id, project_id + ); + } else { + debug!("No session found for project: {}", project_id); + } + } + SessionCommand::MergeUpdates { + project_id, + data, + updated_by, + } => { debug!( - "Job completed by user: {} for project: {}", - user.id, project_id + "Merging updates for project: {}, updated by: {:?}, data length: {}", + project_id, + updated_by, + data.len() ); + let result = self.project_service + .merge_updates(&project_id, data, updated_by) + .await; + debug!("Merge updates result: {:?}", result); + result?; + } + SessionCommand::CheckStatus { project_id } => { + debug!("Checking session status for project: {}", project_id); + } + SessionCommand::AddTask { project_id } => { + debug!("Adding task for project: {}", project_id); + let result = self.add_task(&project_id).await; + debug!("Add task result: {:?}", result); + result?; + } + SessionCommand::RemoveTask { project_id } => { + debug!("Removing task for project: {}", project_id); + let result = self.remove_task(&project_id).await; + debug!("Remove task result: {:?}", result); + result?; + } + SessionCommand::ListAllSnapshotsVersions { project_id } => { + debug!("Listing snapshots versions for project: {}", project_id); + let versions = self + .project_service + .list_all_snapshots_versions(&project_id) + .await?; + debug!( + "Retrieved {} snapshots versions for project {}: {:?}", + versions.len(), + project_id, + versions + ); + } + // Workspace related commands + SessionCommand::CreateWorkspace { workspace } => { + debug!("Creating workspace: {:?}", workspace); + let result = self.project_service.create_workspace(workspace.clone()).await; + debug!("Create workspace result for {}: {:?}", workspace.id, result); + result?; + } + SessionCommand::DeleteWorkspace { workspace_id } => { + debug!("Deleting workspace: {}", workspace_id); + let result = self.project_service.delete_workspace(&workspace_id).await; + debug!("Delete workspace result: {:?}", result); + result?; + } + SessionCommand::UpdateWorkspace { workspace } => { + debug!("Updating workspace: {:?}", workspace); + let result = self.project_service.update_workspace(workspace.clone()).await; + debug!("Update workspace result for {}: {:?}", workspace.id, result); + result?; + } + SessionCommand::ListWorkspaceProjectsIds { workspace_id } => { + debug!("Listing projects for workspace: {}", workspace_id); + let projects = self + .project_service + .list_workspace_projects_ids(&workspace_id) + .await?; + debug!( + "Retrieved {} projects for workspace {}: {:?}", + projects.len(), + workspace_id, + projects + ); + } + // Project related commands + SessionCommand::CreateProject { project } => { + debug!("Creating project: {:?}", project); + let result = self.project_service.create_project(project.clone()).await; + debug!("Create project result for {}: {:?}", project.id, result); + result?; + } + SessionCommand::DeleteProject { project_id } => { + debug!("Deleting project: {}", project_id); + let result = self.project_service.delete_project(&project_id).await; + debug!("Delete project result: {:?}", result); + result?; + } + SessionCommand::UpdateProject { project } => { + debug!("Updating project: {:?}", project); + let result = self.project_service.update_project(project.clone()).await; + debug!("Update project result for {}: {:?}", project.id, result); + result?; } } - SessionCommand::MergeUpdates { - project_id, - data, - updated_by, - } => { - self.project_service - .merge_updates(&project_id, data, updated_by) - .await?; - } - SessionCommand::CheckStatus { project_id } => { - debug!("Checking session status for project: {}", project_id); - } - SessionCommand::AddTask { project_id } => { - self.add_task(&project_id).await?; - } - SessionCommand::RemoveTask { project_id } => { - self.remove_task(&project_id).await?; - } - SessionCommand::ListAllSnapshotsVersions { project_id } => { - let versions = self - .project_service - .list_all_snapshots_versions(&project_id) - .await?; - debug!( - "Snapshots versions for project {}: {:?}", - project_id, versions - ); - } - // Workspace related commands - SessionCommand::CreateWorkspace { workspace } => { - self.project_service.create_workspace(workspace).await?; - } - SessionCommand::DeleteWorkspace { workspace_id } => { - self.project_service.delete_workspace(&workspace_id).await?; - } - SessionCommand::UpdateWorkspace { workspace } => { - self.project_service.update_workspace(workspace).await?; - } - SessionCommand::ListWorkspaceProjectsIds { workspace_id } => { - let projects = self - .project_service - .list_workspace_projects_ids(&workspace_id) - .await?; - debug!("Projects for workspace {}: {:?}", workspace_id, projects); - } - // Project related commands - SessionCommand::CreateProject { project } => { - self.project_service.create_project(project).await?; - } - SessionCommand::DeleteProject { project_id } => { - self.project_service.delete_project(&project_id).await?; - } - SessionCommand::UpdateProject { project } => { - self.project_service.update_project(project).await?; - } - }, + } Err(broadcast::error::RecvError::Closed) => { - debug!("Command channel closed"); + debug!("Command channel closed, waiting before retry"); sleep(Duration::from_secs(1)).await; } Err(broadcast::error::RecvError::Lagged(n)) => { - debug!("Receiver lagged behind by {} messages", n); + debug!("Receiver lagged behind by {} messages, some commands may have been dropped", n); } } + debug!("Command handling completed successfully"); Ok(()) } From b6365079da7bf330ed7eaa1dc9d037813775e7fa Mon Sep 17 00:00:00 2001 From: xy Date: Mon, 25 Nov 2024 19:36:48 +0900 Subject: [PATCH 06/17] Add support for msg type --- ui/src/lib/yjs/socketYjsManager.ts | 32 ++++++++++++++++-------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index cb419a034..d44c13138 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -6,6 +6,18 @@ import type { FlowMessage } from "./types"; export type AccessTokenProvider = () => Promise | string; +enum MessageType { + UPDATE = 1, + SYNC = 2, +} + +function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { + const message = new Uint8Array(data.length + 1); + message[0] = type; + message.set(data, 1); + return message; +} + export class SocketYjsManager { protected ws!: WebSocket; protected doc: Y.Doc; @@ -225,8 +237,10 @@ export class SocketYjsManager { const currentState = Y.encodeStateAsUpdateV2(this.doc); const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); + if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(stateVector); + const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); + this.ws.send(syncMessage); } if (!this.firstSyncComplete) { @@ -256,20 +270,8 @@ export class SocketYjsManager { const stateVector = Y.encodeStateVectorFromUpdateV2(update); const diffUpdate = Y.diffUpdateV2(update, stateVector); - this.sendFlowMessage({ - event: { - tag: "Emit", - content: { data: "" } - }, - session_command: { - tag: "MergeUpdates", - content: { - project_id: this.projectId || "", - data: new Uint8Array(diffUpdate), - updated_by: this.doc.clientID.toString() - } - } - }); + const updateMessage = createBinaryMessage(MessageType.UPDATE, diffUpdate); + this.ws.send(updateMessage); } } From 839ad71842d7e30335d27c9120805d81b374b074 Mon Sep 17 00:00:00 2001 From: xy Date: Mon, 25 Nov 2024 20:08:17 +0900 Subject: [PATCH 07/17] Update msg type --- ui/src/lib/yjs/socketYjsManager.ts | 20 ++------------------ ui/src/lib/yjs/types.ts | 2 -- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index d44c13138..53a7f2293 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -193,15 +193,7 @@ export class SocketYjsManager { }, session_command: { tag: "Start", - content: { - project_id: this.projectId || "", - user: { - id: this.doc.clientID.toString(), - tenant_id: this.projectId, - name: "defaultName", - email: "defaultEmail@example.com", - }, - }, + content: {}, }, }); @@ -292,15 +284,7 @@ export class SocketYjsManager { }, session_command: { tag: "End", - content: { - project_id: this.projectId || "", - user: { - id: this.doc.clientID.toString(), - tenant_id: this.projectId, - name: "defaultName", - email: "defaultEmail@example.com", - }, - }, + content: {}, }, }).finally(() => { this.ws.close(); diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index e4a1b85e6..39fef50c4 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -31,9 +31,7 @@ export type SessionCommand = { | "ListAllSnapshotsVersions" | "MergeUpdates"; content: { - project_id: string; user?: User; data?: Uint8Array; - updated_by?: string; }; }; From 0eb3ea159c97fe72bad39505b78b86e9c52bf780 Mon Sep 17 00:00:00 2001 From: xy Date: Tue, 26 Nov 2024 01:28:07 +0900 Subject: [PATCH 08/17] Add project id for start --- ui/src/lib/yjs/socketYjsManager.ts | 2 +- ui/src/lib/yjs/types.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 53a7f2293..a20363bb3 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -193,7 +193,7 @@ export class SocketYjsManager { }, session_command: { tag: "Start", - content: {}, + content: { project_id: this.projectId }, }, }); diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index 39fef50c4..837782ef7 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -31,6 +31,7 @@ export type SessionCommand = { | "ListAllSnapshotsVersions" | "MergeUpdates"; content: { + project_id?: string; user?: User; data?: Uint8Array; }; From 596f47ce58d7c29b5c6e13bccc50bf3a37138903 Mon Sep 17 00:00:00 2001 From: xy Date: Tue, 26 Nov 2024 01:31:01 +0900 Subject: [PATCH 09/17] Remove MessageType to types --- ui/src/lib/yjs/socketYjsManager.ts | 14 +------------- ui/src/lib/yjs/types.ts | 12 ++++++++++++ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index a20363bb3..71161bb29 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -2,22 +2,10 @@ import * as Y from "yjs"; import { sleep } from "../utils"; -import type { FlowMessage } from "./types"; +import { createBinaryMessage, MessageType, type FlowMessage } from "./types"; export type AccessTokenProvider = () => Promise | string; -enum MessageType { - UPDATE = 1, - SYNC = 2, -} - -function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { - const message = new Uint8Array(data.length + 1); - message[0] = type; - message.set(data, 1); - return message; -} - export class SocketYjsManager { protected ws!: WebSocket; protected doc: Y.Doc; diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index 837782ef7..7f82e392b 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -36,3 +36,15 @@ export type SessionCommand = { data?: Uint8Array; }; }; + +export enum MessageType { + UPDATE = 1, + SYNC = 2, +} + +export function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { + const message = new Uint8Array(data.length + 1); + message[0] = type; + message.set(data, 1); + return message; +} From 9055fa2caf338ef9c19afa9a96d3c35e58232b9c Mon Sep 17 00:00:00 2001 From: pyshx Date: Tue, 26 Nov 2024 13:42:50 +0530 Subject: [PATCH 10/17] remove encode method from onupdate --- ui/src/lib/yjs/socketYjsManager.ts | 14 +++++++++----- ui/src/lib/yjs/types.ts | 7 ------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 71161bb29..eb1cd5188 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -2,7 +2,7 @@ import * as Y from "yjs"; import { sleep } from "../utils"; -import { createBinaryMessage, MessageType, type FlowMessage } from "./types"; +import { MessageType, type FlowMessage } from "./types"; export type AccessTokenProvider = () => Promise | string; @@ -247,10 +247,7 @@ export class SocketYjsManager { protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { - const stateVector = Y.encodeStateVectorFromUpdateV2(update); - const diffUpdate = Y.diffUpdateV2(update, stateVector); - - const updateMessage = createBinaryMessage(MessageType.UPDATE, diffUpdate); + const updateMessage = createBinaryMessage(MessageType.UPDATE, update); this.ws.send(updateMessage); } } @@ -281,3 +278,10 @@ export class SocketYjsManager { this.doc.destroy(); } } + +function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { + const message = new Uint8Array(data.length + 1); + message[0] = type; + message.set(data, 1); + return message; +} diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index 7f82e392b..6adc3ea04 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -41,10 +41,3 @@ export enum MessageType { UPDATE = 1, SYNC = 2, } - -export function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { - const message = new Uint8Array(data.length + 1); - message[0] = type; - message.set(data, 1); - return message; -} From c08e26d38fc8c5727f3f0c09ec86338d079d26e1 Mon Sep 17 00:00:00 2001 From: xy Date: Tue, 26 Nov 2024 18:13:17 +0900 Subject: [PATCH 11/17] Fix missing content issue --- ui/src/lib/yjs/socketYjsManager.ts | 79 ++++++++++++++++++++++++------ ui/src/lib/yjs/types.ts | 1 + 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index eb1cd5188..0608d6ee0 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -181,7 +181,28 @@ export class SocketYjsManager { }, session_command: { tag: "Start", - content: { project_id: this.projectId }, + content: { + project_id: this.projectId, + user: { + id: this.doc.clientID.toString(), + name: "", + email: "", + tenant_id: "123" + } + }, + }, + }); + + await this.sendFlowMessage({ + event: { + tag: "Emit", + content: { data: "" }, + }, + session_command: { + tag: "AddTask", + content: { + project_id: this.projectId, + }, }, }); @@ -237,9 +258,17 @@ export class SocketYjsManager { } try { - this.ws.send(JSON.stringify(message)); + const messageStr = JSON.stringify(message); + console.log("Sending message:", { + type: message.event.tag, + sessionCommand: message.session_command?.tag, + content: message, + rawMessage: messageStr + }); + this.ws.send(messageStr); resolve(); } catch (error) { + console.error("Failed to send message:", error); reject(error); } }); @@ -256,26 +285,48 @@ export class SocketYjsManager { this.onUpdateHandlers.push(handler); } - destroy() { + protected async destroy() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); } if (this.ws && this.ws.readyState === WebSocket.OPEN) { - this.sendFlowMessage({ - event: { - tag: "Emit", - content: { data: "" }, - }, - session_command: { - tag: "End", - content: {}, - }, - }).finally(() => { + console.log("Starting cleanup process..."); + try { + await this.sendFlowMessage({ + event: { + tag: "Emit", + content: { + data: "", + user: { + id: this.doc.clientID.toString(), + name: "", + email: "" + } + }, + }, + session_command: { + tag: "End", + content: { + project_id: this.projectId, + user: { + id: this.doc.clientID.toString(), + name: "", + email: "" + } + }, + }, + }); + console.log("Cleanup message sent successfully"); + } catch (error) { + console.error("Failed to send cleanup message:", error); + } finally { this.ws.close(); - }); + console.log("WebSocket connection closed"); + } } this.doc.destroy(); + console.log("Document destroyed"); } } diff --git a/ui/src/lib/yjs/types.ts b/ui/src/lib/yjs/types.ts index 6adc3ea04..1cc57afc9 100644 --- a/ui/src/lib/yjs/types.ts +++ b/ui/src/lib/yjs/types.ts @@ -15,6 +15,7 @@ export type FlowMessage = { content: { room_id?: string; data?: string; + user?: User; }; }; session_command?: SessionCommand; From 1db64b0955bc0e90882fa779b5e95831e865a66a Mon Sep 17 00:00:00 2001 From: pyshx Date: Tue, 26 Nov 2024 16:39:50 +0530 Subject: [PATCH 12/17] remove websocket changes --- websocket/Cargo.lock | 29 ++ websocket/Cargo.toml | 1 + websocket/app/Cargo.toml | 3 +- websocket/app/examples/room_client.rs | 7 +- websocket/app/examples/session_client.rs | 111 ++++--- websocket/app/src/errors.rs | 4 +- websocket/app/src/handlers/cleanup.rs | 54 ++-- websocket/app/src/handlers/message_handler.rs | 69 ++-- websocket/app/src/handlers/mod.rs | 3 +- websocket/app/src/handlers/room_handler.rs | 95 ++++++ websocket/app/src/handlers/socket_handler.rs | 62 +++- websocket/app/src/handlers/types.rs | 44 ++- websocket/app/src/lib.rs | 1 + websocket/app/src/state.rs | 2 +- .../redis/flow_project_redis_data_manager.rs | 18 +- .../infra/src/persistence/redis/updates.rs | 186 +++++------ .../infra/src/persistence/repository.rs | 5 + .../services/examples/edit_session_service.rs | 24 +- websocket/crates/services/src/lib.rs | 1 + .../src/manage_project_edit_session.rs | 294 ++++++------------ websocket/crates/services/src/project.rs | 12 + websocket/crates/services/src/types.rs | 60 ++++ 22 files changed, 656 insertions(+), 429 deletions(-) diff --git a/websocket/Cargo.lock b/websocket/Cargo.lock index 05affd8b7..138853ed6 100644 --- a/websocket/Cargo.lock +++ b/websocket/Cargo.lock @@ -71,6 +71,7 @@ dependencies = [ "envy", "flow-websocket-infra", "flow-websocket-services", + "futures", "futures-util", "rand", "redis", @@ -660,6 +661,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -667,6 +683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -675,6 +692,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -710,6 +738,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml index 0419a681d..303a9b9cf 100644 --- a/websocket/Cargo.toml +++ b/websocket/Cargo.toml @@ -46,6 +46,7 @@ bb8-redis = "0.17.0" chrono = { version = "0.4", features = ["serde"] } dotenv = "0.15.0" envy = "0.4.2" +futures = "0.3.4" futures-util = "0.3" google-cloud-storage = "0.22.1" lru = "0.12.4" diff --git a/websocket/app/Cargo.toml b/websocket/app/Cargo.toml index 8b55f7e6a..4c3388176 100644 --- a/websocket/app/Cargo.toml +++ b/websocket/app/Cargo.toml @@ -38,4 +38,5 @@ futures-util.workspace = true url.workspace = true base64.workspace = true rand.workspace = true -serde_yaml.workspace = true \ No newline at end of file +serde_yaml.workspace = true +futures.workspace = true \ No newline at end of file diff --git a/websocket/app/examples/room_client.rs b/websocket/app/examples/room_client.rs index 118037d33..d8a577470 100644 --- a/websocket/app/examples/room_client.rs +++ b/websocket/app/examples/room_client.rs @@ -5,7 +5,6 @@ use tokio_tungstenite::{connect_async_with_config, tungstenite::http::Request}; use tracing::error; use tracing::info; use url::Url; -// Add these struct definitions at the top #[derive(Serialize)] struct Event { event: EventData, @@ -35,15 +34,15 @@ struct EmitContent { #[tokio::main] async fn main() -> Result<(), Box> { - let project_id = "test_project"; let user_id = "test_user"; let room_id = "room123"; + let token = "nyaan"; let url = Url::parse(&format!( - "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&project_id={project_id}", + "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&token={token}", room_id = room_id, user_id = user_id, - project_id = project_id + token = token, ))?; let request = Request::builder() diff --git a/websocket/app/examples/session_client.rs b/websocket/app/examples/session_client.rs index e32485d64..56a65026f 100644 --- a/websocket/app/examples/session_client.rs +++ b/websocket/app/examples/session_client.rs @@ -1,10 +1,13 @@ +use app::MessageType; +use flow_websocket_infra::types::user::User; use futures_util::{SinkExt, StreamExt}; use serde::Serialize; use tokio_tungstenite::tungstenite::http::Request; use tokio_tungstenite::{connect_async_with_config, tungstenite::Message}; use tracing::{error, info}; use url::Url; -use yrs::{Doc, Text, Transact}; +use yrs::ReadTxn; +use yrs::{updates::encoder::Encode, Doc, Text, Transact}; #[derive(Serialize)] #[serde(tag = "tag", content = "content")] @@ -21,8 +24,9 @@ struct FlowMessage { session_command: Option, } -#[derive(Serialize)] -enum SessionCommand { +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "tag", content = "content")] +pub enum SessionCommand { Start { project_id: String, user: User, @@ -47,38 +51,48 @@ enum SessionCommand { ListAllSnapshotsVersions { project_id: String, }, + #[warn(dead_code)] MergeUpdates { project_id: String, data: Vec, updated_by: Option, }, + ProcessStateVector { + project_id: String, + state_vector: Vec, + }, } -#[derive(Serialize, Clone)] -struct User { - id: String, - email: Option, - name: Option, - tenant_id: String, +// #[derive(Serialize, Clone)] +// struct User { +// id: String, +// email: Option, +// name: Option, +// tenant_id: String, +// } + +fn create_binary_message(msg_type: MessageType, data: Vec) -> Vec { + let mut message = Vec::with_capacity(data.len() + 1); + message.push(msg_type._as_byte()); + message.extend_from_slice(&data); + message } #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); - let project_id = "test_project"; let user_id = "test_user"; let room_id = "room123"; + let auth_token = "nyaan"; let url = Url::parse(&format!( - "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&project_id={project_id}&token=nyaan", + "ws://127.0.0.1:8080/{room_id}?user_id={user_id}&token={token}", room_id = room_id, user_id = user_id, - project_id = project_id + token = auth_token ))?; - let auth_token = "your_auth_token_here"; - let request = Request::builder() .uri(url.as_str()) .header("Host", url.host_str().unwrap()) @@ -86,7 +100,6 @@ async fn main() -> Result<(), Box> { .header("Upgrade", "websocket") .header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Key", generate_key()) - .header("Authorization", format!("Bearer {}", auth_token)) .body(())?; let (ws_stream, _) = connect_async_with_config(request, None, false).await?; @@ -128,17 +141,24 @@ async fn main() -> Result<(), Box> { ) .await?; - let test_user = User { - id: user_id.to_string(), - email: Some("test.user@example.com".to_string()), - name: Some("Test User".to_string()), - tenant_id: "test_tenant".to_string(), - }; + // let test_user = User { + // id: user_id.to_string(), + // email: Some("test.user@example.com".to_string()), + // name: Some("Test User".to_string()), + // tenant_id: "test_tenant".to_string(), + // }; + + let project_id = "test_project3".to_string(); + let user = User::new( + user_id.to_string(), + None, // email + None, // name + ); send_command( &mut write, SessionCommand::AddTask { - project_id: project_id.to_string(), + project_id: project_id.clone(), }, ) .await?; @@ -147,8 +167,8 @@ async fn main() -> Result<(), Box> { send_command( &mut write, SessionCommand::Start { - project_id: project_id.to_string(), - user: test_user.clone(), + project_id: project_id.clone(), + user: user.clone(), }, ) .await?; @@ -157,10 +177,21 @@ async fn main() -> Result<(), Box> { let doc = Doc::new(); let text = doc.get_or_insert_text("test"); + let state_vector = { + let txn = doc.transact(); + let state_vector = txn.state_vector(); + let encode = state_vector.encode_v2(); + create_binary_message(MessageType::Sync, encode) + }; + + write.send(Message::Binary(state_vector)).await?; + info!("State vector sent"); + let update1 = { let mut txn = doc.transact_mut(); text.push(&mut txn, "Hello, YJS!"); - txn.encode_update_v2() + let update = txn.encode_update_v2(); + create_binary_message(MessageType::Update, update) }; write.send(Message::Binary(update1)).await?; @@ -169,7 +200,8 @@ async fn main() -> Result<(), Box> { let update2 = { let mut txn = doc.transact_mut(); text.push(&mut txn, " More text!"); - txn.encode_update_v2() + let update = txn.encode_update_v2(); + create_binary_message(MessageType::Update, update) }; write.send(Message::Binary(update2)).await?; @@ -178,25 +210,18 @@ async fn main() -> Result<(), Box> { let update_data = { let mut txn = doc.transact_mut(); text.push(&mut txn, "Hello from merge update!"); - txn.encode_update_v2() + let update = txn.encode_update_v2(); + create_binary_message(MessageType::Update, update) }; - send_command( - &mut write, - SessionCommand::MergeUpdates { - project_id: project_id.to_string(), - data: update_data, - updated_by: Some(user_id.to_string()), - }, - ) - .await?; + write.send(Message::Binary(update_data)).await?; info!("MergeUpdates command sent with YJS update"); send_command( &mut write, SessionCommand::Complete { - project_id: project_id.to_string(), - user: test_user.clone(), + project_id: project_id.clone(), + user: user.clone(), }, ) .await?; @@ -205,7 +230,7 @@ async fn main() -> Result<(), Box> { send_command( &mut write, SessionCommand::CheckStatus { - project_id: project_id.to_string(), + project_id: project_id.clone(), }, ) .await?; @@ -214,7 +239,7 @@ async fn main() -> Result<(), Box> { send_command( &mut write, SessionCommand::ListAllSnapshotsVersions { - project_id: project_id.to_string(), + project_id: project_id.clone(), }, ) .await?; @@ -223,8 +248,8 @@ async fn main() -> Result<(), Box> { send_command( &mut write, SessionCommand::End { - project_id: project_id.to_string(), - user: test_user.clone(), + project_id: project_id.clone(), + user: user.clone(), }, ) .await?; @@ -233,7 +258,7 @@ async fn main() -> Result<(), Box> { send_command( &mut write, SessionCommand::RemoveTask { - project_id: project_id.to_string(), + project_id: project_id.clone(), }, ) .await?; diff --git a/websocket/app/src/errors.rs b/websocket/app/src/errors.rs index e0a8df1a8..9446157fb 100644 --- a/websocket/app/src/errors.rs +++ b/websocket/app/src/errors.rs @@ -1,7 +1,7 @@ use flow_websocket_infra::persistence::{ gcs::gcs_client::GcsError, redis::errors::FlowProjectRedisDataManagerError, }; -use flow_websocket_services::manage_project_edit_session::SessionCommand; +use flow_websocket_services::SessionCommand; use thiserror::Error; use tokio::sync::broadcast; @@ -40,4 +40,6 @@ pub enum WsError { GcsStorage(#[from] GcsError), #[error(transparent)] Room(#[from] crate::room::RoomError), + #[error(transparent)] + Axum(#[from] axum::Error), } diff --git a/websocket/app/src/handlers/cleanup.rs b/websocket/app/src/handlers/cleanup.rs index f704476a4..bc3740ccc 100644 --- a/websocket/app/src/handlers/cleanup.rs +++ b/websocket/app/src/handlers/cleanup.rs @@ -1,6 +1,7 @@ use crate::state::AppState; use flow_websocket_infra::types::user::User; -use flow_websocket_services::manage_project_edit_session::SessionCommand; +use flow_websocket_services::SessionCommand; + use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -15,40 +16,31 @@ pub fn perform_cleanup( project_id: Option, state: Arc, cleanup_tx: Sender<()>, -) -> impl Fn() { - move || { - if is_cleaning_up - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - { - let state = state.clone(); - let room_id = room_id.clone(); - let user = user.clone(); - let project_id = project_id.clone(); - let cleanup_tx = cleanup_tx.clone(); +) { + if is_cleaning_up + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + tokio::spawn(async move { + if let Err(e) = state.leave(&room_id, &user.id).await { + debug!("Cleanup error: {:?}", e); + } - tokio::spawn(async move { - if let Err(e) = state.leave(&room_id, &user.id).await { - debug!("Cleanup error: {:?}", e); + if let Some(project_id) = project_id { + if let Err(e) = state.command_tx.send(SessionCommand::End { + project_id: project_id.clone(), + }) { + debug!("Failed to send End command: {:?}", e); } - if let Some(project_id) = project_id { - if let Err(e) = state.command_tx.send(SessionCommand::End { - project_id: project_id.clone(), - user: user.clone(), - }) { - debug!("Failed to send End command: {:?}", e); - } - - if let Err(e) = state.command_tx.send(SessionCommand::RemoveTask { - project_id: project_id.clone(), - }) { - debug!("Failed to send RemoveTask command: {:?}", e); - } + if let Err(e) = state.command_tx.send(SessionCommand::RemoveTask { + project_id: project_id.clone(), + }) { + debug!("Failed to send RemoveTask command: {:?}", e); } + } - let _ = cleanup_tx.send(()).await; - }); - } + let _ = cleanup_tx.send(()).await; + }); } } diff --git a/websocket/app/src/handlers/message_handler.rs b/websocket/app/src/handlers/message_handler.rs index 1c3f45374..198d1ca16 100644 --- a/websocket/app/src/handlers/message_handler.rs +++ b/websocket/app/src/handlers/message_handler.rs @@ -1,17 +1,21 @@ use crate::{errors::WsError, state::AppState}; use axum::extract::ws::Message; use flow_websocket_infra::types::user::User; -use flow_websocket_services::manage_project_edit_session::SessionCommand; +use flow_websocket_services::SessionCommand; use std::{net::SocketAddr, sync::Arc}; -use tracing::{debug, trace}; +use tracing::debug; -use super::{room_handler::handle_room_event, types::FlowMessage}; +use super::{ + room_handler::{handle_room_event, handle_session_command}, + socket_handler::ConnectionState, + types::{parse_message, FlowMessage, MessageType}, +}; pub async fn handle_message( msg: Message, addr: SocketAddr, room_id: &str, - project_id: Option, + conn_state: &ConnectionState, state: Arc, user: User, ) -> Result, WsError> { @@ -20,35 +24,58 @@ pub async fn handle_message( let msg: FlowMessage = serde_json::from_str(&t)?; if let Some(command) = msg.session_command { - state.command_tx.send(command)?; + handle_session_command(command.clone(), conn_state, &user, &state).await?; + if matches!(command, SessionCommand::End { .. }) { + conn_state.start_cleanup(); + } } else { handle_room_event(&msg.event, room_id, &state, &user).await?; } Ok(None) } Message::Binary(d) => { - trace!("{} sent {} bytes: {:?}", addr, d.len(), d); - if d.len() >= 3 { - if let Some(project_id) = project_id { - state.command_tx.send(SessionCommand::MergeUpdates { - project_id: project_id.clone(), - data: d, - updated_by: Some(user.id.clone()), - })?; - } + debug!("{} sent {} bytes: {:?}", addr, d.len(), d); + + if let Some(response) = process_binary_message(d, conn_state, &user, &state).await? { + conn_state.send_message(response).await?; } Ok(None) } Message::Close(_) => { debug!("Client {addr} sent close message"); - if let Some(project_id) = project_id { - state.command_tx.send(SessionCommand::End { - project_id: project_id.clone(), - user: user.clone(), - })?; - } + conn_state.start_cleanup(); Ok(None) } - _ => Ok(None), + Message::Ping(data) => Ok(Some(Message::Pong(data))), + Message::Pong(_) => Ok(None), + } +} + +async fn process_binary_message( + data: Vec, + conn_state: &ConnectionState, + user: &User, + state: &Arc, +) -> Result, WsError> { + if let Some((msg_type, payload)) = parse_message(&data) { + let project_id = conn_state.current_project_id.lock().await.clone(); + if let Some(pid) = project_id { + match msg_type { + MessageType::Update => { + state.command_tx.send(SessionCommand::MergeUpdates { + project_id: pid, + data: payload.to_vec(), + updated_by: Some(user.id.clone()), + })?; + } + MessageType::Sync => { + state.command_tx.send(SessionCommand::ProcessStateVector { + project_id: pid, + state_vector: payload.to_vec(), + })?; + } + } + } } + Ok(None) } diff --git a/websocket/app/src/handlers/mod.rs b/websocket/app/src/handlers/mod.rs index 66f879e56..4c112377b 100644 --- a/websocket/app/src/handlers/mod.rs +++ b/websocket/app/src/handlers/mod.rs @@ -5,6 +5,8 @@ mod room_handler; mod socket_handler; mod types; +pub use types::MessageType; + use crate::handlers::socket_handler::handle_socket; use crate::state::AppState; use axum::extract::{ws::WebSocketUpgrade, ConnectInfo, Path, Query, State}; @@ -29,7 +31,6 @@ pub async fn handle_upgrade( query.token().to_string(), room_id, state, - query.project_id(), user, ) }) diff --git a/websocket/app/src/handlers/room_handler.rs b/websocket/app/src/handlers/room_handler.rs index c9fd8739e..d4311e953 100644 --- a/websocket/app/src/handlers/room_handler.rs +++ b/websocket/app/src/handlers/room_handler.rs @@ -1,8 +1,10 @@ use flow_websocket_infra::types::user::User; +use flow_websocket_services::SessionCommand; use crate::{errors::WsError, state::AppState}; use std::sync::Arc; +use super::socket_handler::ConnectionState; use super::types::Event; pub async fn handle_room_event( @@ -27,3 +29,96 @@ pub async fn handle_room_event( } Ok(()) } + +pub async fn handle_session_command( + command: SessionCommand, + conn_state: &ConnectionState, + user: &User, + state: &Arc, +) -> Result<(), WsError> { + let mut project_id = conn_state.current_project_id.lock().await; + + let command = match command { + SessionCommand::Start { + project_id: pid, .. + } => { + *project_id = Some(pid.clone()); + SessionCommand::Start { + project_id: pid, + user: user.clone(), + } + } + SessionCommand::End { .. } => { + if let Some(pid) = project_id.clone() { + *project_id = None; + SessionCommand::End { project_id: pid } + } else { + return Ok(()); + } + } + cmd => { + if let Some(pid) = project_id.clone() { + match cmd { + SessionCommand::Complete { .. } => SessionCommand::Complete { + project_id: pid.clone(), + user: user.clone(), + }, + SessionCommand::CheckStatus { .. } => SessionCommand::CheckStatus { + project_id: pid.clone(), + }, + SessionCommand::AddTask { .. } => SessionCommand::AddTask { + project_id: pid.clone(), + }, + SessionCommand::RemoveTask { .. } => SessionCommand::RemoveTask { + project_id: pid.clone(), + }, + SessionCommand::ListAllSnapshotsVersions { .. } => { + SessionCommand::ListAllSnapshotsVersions { + project_id: pid.clone(), + } + } + SessionCommand::MergeUpdates { data, .. } => SessionCommand::MergeUpdates { + project_id: pid.clone(), + data, + updated_by: Some(user.id.clone()), + }, + SessionCommand::ProcessStateVector { state_vector, .. } => { + SessionCommand::ProcessStateVector { + project_id: pid.clone(), + state_vector, + } + } + _ => cmd, + } + } else { + match cmd { + SessionCommand::CreateWorkspace { workspace } => { + SessionCommand::CreateWorkspace { workspace } + } + SessionCommand::DeleteWorkspace { workspace_id } => { + SessionCommand::DeleteWorkspace { workspace_id } + } + SessionCommand::UpdateWorkspace { workspace } => { + SessionCommand::UpdateWorkspace { workspace } + } + SessionCommand::ListWorkspaceProjectsIds { workspace_id } => { + SessionCommand::ListWorkspaceProjectsIds { workspace_id } + } + SessionCommand::CreateProject { project } => { + SessionCommand::CreateProject { project } + } + SessionCommand::DeleteProject { project_id } => { + SessionCommand::DeleteProject { project_id } + } + SessionCommand::UpdateProject { project } => { + SessionCommand::UpdateProject { project } + } + _ => return Ok(()), + } + } + } + }; + + state.command_tx.send(command).map_err(WsError::from)?; + Ok(()) +} diff --git a/websocket/app/src/handlers/socket_handler.rs b/websocket/app/src/handlers/socket_handler.rs index 43789d291..3a7e04072 100644 --- a/websocket/app/src/handlers/socket_handler.rs +++ b/websocket/app/src/handlers/socket_handler.rs @@ -1,9 +1,13 @@ +use crate::errors::WsError; use crate::handlers::message_handler::handle_message; use crate::state::AppState; use axum::extract::ws::{Message, WebSocket}; use flow_websocket_infra::types::user::User; +use futures::SinkExt; +use futures_util::stream::SplitSink; use futures_util::StreamExt; use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc, Mutex}; use tracing::debug; @@ -11,13 +15,37 @@ use tracing::debug; use super::cleanup::perform_cleanup; use super::heartbeat::start_heartbeat; +pub struct ConnectionState { + pub sender: Arc>>, + pub is_cleaning_up: Arc, + pub cleanup_tx: mpsc::Sender<()>, + pub current_project_id: Arc>>, +} + +impl ConnectionState { + pub async fn send_message(&self, message: Message) -> Result<(), WsError> { + let mut sender = self.sender.lock().await; + sender.send(message).await?; + Ok(()) + } + + pub fn start_cleanup(&self) { + if self + .is_cleaning_up + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let _ = self.cleanup_tx.try_send(()); + } + } +} + pub async fn handle_socket( mut socket: WebSocket, addr: SocketAddr, token: String, room_id: String, state: Arc, - project_id: Option, user: User, ) { if !verify_connection(&mut socket, &addr, &token).await { @@ -28,24 +56,36 @@ pub async fn handle_socket( let sender = Arc::new(Mutex::new(sender)); let is_cleaning_up = Arc::new(AtomicBool::new(false)); let (cleanup_tx, mut cleanup_rx) = mpsc::channel(1); + let current_project_id = Arc::new(Mutex::new(None)); + + let conn_state = ConnectionState { + sender: sender.clone(), + is_cleaning_up: is_cleaning_up.clone(), + cleanup_tx: cleanup_tx.clone(), + current_project_id: current_project_id.clone(), + }; let cleanup = { let is_cleaning_up = is_cleaning_up.clone(); let room_id = room_id.clone(); let user = user.clone(); - let project_id = project_id.clone(); let state = state.clone(); let cleanup_tx = cleanup_tx.clone(); + let current_project_id = current_project_id.clone(); Arc::new(move || { - let _ = perform_cleanup( - is_cleaning_up.clone(), - room_id.clone(), - user.clone(), - project_id.clone(), - state.clone(), - cleanup_tx.clone(), - ); + let is_cleaning_up = is_cleaning_up.clone(); + let room_id = room_id.clone(); + let user = user.clone(); + let state = state.clone(); + let cleanup_tx = cleanup_tx.clone(); + let current_project_id = current_project_id.clone(); + + tokio::spawn(async move { + let mut project_id_lock = current_project_id.lock().await; + let project_id = project_id_lock.take(); + perform_cleanup(is_cleaning_up, room_id, user, project_id, state, cleanup_tx); + }); }) as Arc }; @@ -58,7 +98,7 @@ pub async fn handle_socket( msg, addr, &room_id, - project_id.clone(), + &conn_state, state.clone(), user.clone(), ) diff --git a/websocket/app/src/handlers/types.rs b/websocket/app/src/handlers/types.rs index 37a62f11b..90c337629 100644 --- a/websocket/app/src/handlers/types.rs +++ b/websocket/app/src/handlers/types.rs @@ -1,4 +1,4 @@ -use flow_websocket_services::manage_project_edit_session::SessionCommand; +use flow_websocket_services::SessionCommand; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -20,7 +20,6 @@ pub struct FlowMessage { pub struct WebSocketQuery { token: String, user_id: String, - project_id: Option, } impl WebSocketQuery { @@ -28,15 +27,44 @@ impl WebSocketQuery { &self.user_id } - pub fn project_id(&self) -> Option { - self.project_id.clone() + pub fn token(&self) -> &str { + &self.token } +} - pub fn _update_project_id(&mut self, project_id: Option) { - self.project_id = project_id; +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum MessageType { + Update = 1, + Sync = 2, +} + +impl MessageType { + pub fn from_byte(byte: u8) -> Option { + match byte { + 1 => Some(Self::Update), + 2 => Some(Self::Sync), + _ => None, + } } - pub fn token(&self) -> &str { - &self.token + pub fn _as_byte(&self) -> u8 { + *self as u8 } } + +/// Parses a binary message according to the Flow protocol format: +/// - Byte 0: Message type (1 = UPDATE, 2 = SYNC) +/// - Bytes 1+: Message payload +/// +/// Returns None if the input is empty or has an invalid message type. +/// Returns Some((message_type, payload)) on successful parsing. +pub fn parse_message(data: &[u8]) -> Option<(MessageType, &[u8])> { + // Ensure we have at least one byte for the message type + let type_byte = *data.first()?; + + // Parse and validate message type + let msg_type = MessageType::from_byte(type_byte)?; + + // Return message type and remaining payload + Some((msg_type, &data[1..])) +} diff --git a/websocket/app/src/lib.rs b/websocket/app/src/lib.rs index 69110aebc..38ce33e17 100644 --- a/websocket/app/src/lib.rs +++ b/websocket/app/src/lib.rs @@ -6,5 +6,6 @@ mod room; pub mod state; pub use config::Config; mod routes; +pub use handlers::MessageType; pub use middleware::add_middleware; pub use routes::create_router; diff --git a/websocket/app/src/state.rs b/websocket/app/src/state.rs index d72c585da..f6df00b95 100644 --- a/websocket/app/src/state.rs +++ b/websocket/app/src/state.rs @@ -11,7 +11,7 @@ use flow_websocket_infra::persistence::ProjectGcsRepository; #[allow(unused_imports)] use flow_websocket_infra::persistence::ProjectLocalRepository; use flow_websocket_services::manage_project_edit_session::ManageEditSessionService; -use flow_websocket_services::manage_project_edit_session::SessionCommand; +use flow_websocket_services::SessionCommand; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; diff --git a/websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs b/websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs index b76c92336..6090160ff 100644 --- a/websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs +++ b/websocket/crates/infra/src/persistence/redis/flow_project_redis_data_manager.rs @@ -94,8 +94,8 @@ impl FlowProjectRedisDataManager { debug!("Update data: {:?}", update_data); let data_in_redis = self.get_state_in_redis(project_id).await?; - debug!("State updates in redis--------------"); - debug!("State updates in redis: {:?}", data_in_redis); + debug!("State data in redis--------------"); + debug!("State data in redis: {:?}", data_in_redis); let (new_merged_update, new_updates_by) = self .update_manager @@ -164,9 +164,7 @@ impl RedisDataManagerImpl for FlowProjectRedisDataManager { type Error = FlowProjectRedisDataManagerError; async fn get_current_state(&self, project_id: &str) -> Result>, Self::Error> { - let state_key = self.key_manager.state_key(project_id)?; - let current_state: Option> = self.redis_pool.get().await?.get(state_key).await?; - Ok(current_state) + self.update_manager.get_current_state(project_id).await } async fn get_state_updates_by(&self, project_id: &str) -> Result, Self::Error> { @@ -245,6 +243,16 @@ impl RedisDataManagerImpl for FlowProjectRedisDataManager { .await?; Ok(result) } + + async fn process_state_vector( + &self, + project_id: &str, + state_vector: Vec, + ) -> Result>, Self::Error> { + self.update_manager + .handle_state_vector(project_id, state_vector) + .await + } } #[async_trait::async_trait] diff --git a/websocket/crates/infra/src/persistence/redis/updates.rs b/websocket/crates/infra/src/persistence/redis/updates.rs index b1fdc9426..d5512bfcd 100644 --- a/websocket/crates/infra/src/persistence/redis/updates.rs +++ b/websocket/crates/infra/src/persistence/redis/updates.rs @@ -5,9 +5,10 @@ use super::{ }; use bb8::Pool; use bb8_redis::RedisConnectionManager; +use redis::AsyncCommands; use std::sync::Arc; use tracing::debug; -use yrs::{updates::decoder::Decode, Doc, Transact, Update}; +use yrs::{updates::decoder::Decode, Doc, ReadTxn, StateVector, Transact, Update}; type RedisStreamResult = Vec<(String, Vec<(String, Vec<(String, Vec)>)>)>; @@ -30,26 +31,22 @@ impl UpdateManager { pub async fn get_merged_update_from_stream( &self, project_id: &str, - ) -> Result, Vec)>, FlowProjectRedisDataManagerError> { + ) -> Result>, Vec)>, FlowProjectRedisDataManagerError> { let updates = self.get_flow_updates_from_stream(project_id).await?; if updates.is_empty() { return Ok(None); } - let doc = Doc::new(); - let mut txn = doc.transact_mut(); + let mut datas = Vec::new(); let mut updates_by = Vec::new(); for u in updates { - debug!("Processing update: {:?}", u); if let Some(updated_by) = u.updated_by { updates_by.push(updated_by); } - if !u.update.is_empty() { - let _ = txn.apply_update(Update::decode_v2(&u.update)?); - } + datas.push(u.update); } - Ok(Some((txn.encode_update_v2(), updates_by))) + Ok(Some((datas, updates_by))) } pub async fn get_update_stream_items( @@ -101,110 +98,89 @@ impl UpdateManager { redis_data: Option>, new_update_by: Option, ) -> Result<(Vec, Vec), FlowProjectRedisDataManagerError> { - debug!( - "Starting merge_updates_internal for project_id: {}, has_redis_data: {}, new_update_by: {:?}", - project_id, - redis_data.is_some(), - new_update_by - ); - - let (stream_update, stream_updates_by) = match self.get_merged_update_from_stream(project_id).await { - Ok(Some((update, updates_by))) => { - debug!( - "Retrieved stream update for project {}: size={}, updates_by={:?}", - project_id, - update.len(), - updates_by - ); - (update, updates_by) - } - Ok(None) => { - debug!("No existing stream updates found for project {}", project_id); - Default::default() - } - Err(e) => { - debug!( - "Error getting merged update from stream for project {}: {:?}", - project_id, e - ); - return Err(e); + let stream_updates = self.get_merged_update_from_stream(project_id).await?; + + let updates_by = match &stream_updates { + Some((_, by)) => { + let mut updates_by = by.clone(); + if let Some(new_update_by) = new_update_by { + updates_by.push(new_update_by); + } + updates_by } + None => new_update_by.map(|u| vec![u]).unwrap_or_default(), }; - - let redis_update = redis_data.unwrap_or_default(); - debug!( - "Redis update size for project {}: {}", - project_id, - redis_update.len() - ); - - debug!( - "Spawning blocking task to merge updates for project {}", - project_id - ); - let optimized_merged_state = tokio::task::spawn_blocking(move || { - debug!("Starting merge operation in blocking task"); - let doc = Doc::new(); - let mut txn = doc.transact_mut(); - - if !redis_update.is_empty() { - debug!("Applying redis update of size {}", redis_update.len()); - match Update::decode_v2(&redis_update) { - Ok(update) => { - if let Err(e) = txn.apply_update(update) { - debug!("Error applying redis update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - Err(e) => { - debug!("Error decoding redis update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); + + let optimized_merged_state = tokio::task::spawn_blocking( + move || -> Result, FlowProjectRedisDataManagerError> { + let doc = Doc::new(); + let mut txn = doc.transact_mut(); + + if let Some(redis_update) = redis_data { + txn.apply_update(Update::decode_v2(&redis_update)?)?; + } + + if let Some((updates, _)) = stream_updates { + for update in updates { + txn.apply_update(Update::decode_v2(&update)?)?; } } - } - - if !stream_update.is_empty() { - debug!("Applying stream update of size {}", stream_update.len()); - match Update::decode_v2(&stream_update) { + + Ok(txn.encode_update_v2()) + }, + ) + .await??; + + debug!("Final merged state: {:?}", optimized_merged_state); + Ok((optimized_merged_state, updates_by)) + } + + pub async fn get_current_state( + &self, + project_id: &str, + ) -> Result>, FlowProjectRedisDataManagerError> { + let state_key = self.key_manager.state_key(project_id)?; + let current_state: Option> = self.redis_pool.get().await?.get(state_key).await?; + Ok(current_state) + } + + pub async fn handle_state_vector( + &self, + project_id: &str, + state_vector: Vec, + ) -> Result>, FlowProjectRedisDataManagerError> { + let current_state = self.get_current_state(project_id).await?; + + if let Some(server_state) = current_state { + let diff_update = tokio::task::spawn_blocking(move || { + let doc = Doc::new(); + let mut txn = doc.transact_mut(); + + match Update::decode_v2(&server_state) { Ok(update) => { - if let Err(e) = txn.apply_update(update) { - debug!("Error applying stream update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); - } - } - Err(e) => { - debug!("Error decoding stream update: {:?}", e); - return Err(FlowProjectRedisDataManagerError::from(e)); + txn.apply_update(update)?; } + Err(e) => return Err(FlowProjectRedisDataManagerError::from(e)), } + + let client_state_vector = StateVector::decode_v2(&state_vector)?; + + let diff = txn.encode_state_as_update_v2(&client_state_vector); + Ok(diff) + }) + .await + .map_err(FlowProjectRedisDataManagerError::from)??; + + if !diff_update.is_empty() { + debug!("Generated diff update of size: {}", diff_update.len()); + Ok(Some(diff_update)) + } else { + debug!("No updates needed for client"); + Ok(None) } - - let result = txn.encode_update_v2(); - debug!("Successfully encoded merged update of size {}", result.len()); - Ok(result) - }) - .await - .map_err(|e| { - debug!("Join error from blocking task: {:?}", e); - FlowProjectRedisDataManagerError::from(e) - })??; - - let mut updates_by = stream_updates_by; - if let Some(new_update_by) = new_update_by { - debug!( - "Adding new update attribution for project {}: {}", - project_id, new_update_by - ); - updates_by.push(new_update_by); + } else { + debug!("No server state exists yet"); + Ok(None) } - - debug!( - "Completed merge_updates_internal for project {}: final_size={}, updates_by={:?}", - project_id, - optimized_merged_state.len(), - updates_by - ); - - Ok((optimized_merged_state, updates_by)) } } diff --git a/websocket/crates/infra/src/persistence/repository.rs b/websocket/crates/infra/src/persistence/repository.rs index 292d73c41..0b4eeb455 100644 --- a/websocket/crates/infra/src/persistence/repository.rs +++ b/websocket/crates/infra/src/persistence/repository.rs @@ -46,6 +46,11 @@ pub trait RedisDataManagerImpl { update_data: Vec, updated_by: Option, ) -> Result<(Vec, Vec), Self::Error>; + async fn process_state_vector( + &self, + project_id: &str, + state_vector: Vec, + ) -> Result>, Self::Error>; async fn clear_data( &self, project_id: &str, diff --git a/websocket/crates/services/examples/edit_session_service.rs b/websocket/crates/services/examples/edit_session_service.rs index b45e3eaad..85baf6266 100644 --- a/websocket/crates/services/examples/edit_session_service.rs +++ b/websocket/crates/services/examples/edit_session_service.rs @@ -12,13 +12,13 @@ use flow_websocket_infra::{ }, types::user::User, }; -use flow_websocket_services::manage_project_edit_session::{ - ManageEditSessionService, SessionCommand, +use flow_websocket_services::{ + manage_project_edit_session::ManageEditSessionService, SessionCommand, }; use std::sync::Arc; use tokio::sync::broadcast; use tracing::info; -use yrs::{Doc, Text, Transact}; +use yrs::{updates::encoder::Encode, Doc, ReadTxn, Text, Transact}; ///export REDIS_URL="redis://default:my_redis_password@localhost:6379/0" ///RUST_LOG=debug cargo run --example edit_session_service --features local-storage @@ -152,10 +152,26 @@ async fn main() -> Result<(), Box> { info!("Ending session"); + info!("Processing state vector"); + info!("--------------------------------"); + info!("Processing state vector"); + let doc2 = Doc::new(); + let state_vector = { + let txn = doc2.transact(); + txn.state_vector().encode_v2() + }; + + tx.send(SessionCommand::ProcessStateVector { + project_id: project_id.clone(), + state_vector, + })?; + + info!("Ending session"); + info!("--------------------------------"); + // End session tx.send(SessionCommand::End { project_id: project_id.clone(), - user: test_user.clone(), })?; info!("Removing task"); diff --git a/websocket/crates/services/src/lib.rs b/websocket/crates/services/src/lib.rs index bc184a097..10e76a353 100644 --- a/websocket/crates/services/src/lib.rs +++ b/websocket/crates/services/src/lib.rs @@ -5,3 +5,4 @@ pub mod project; pub use error::ProjectServiceError; pub mod types; pub use types::ManageProjectEditSessionTaskData; +pub use types::SessionCommand; diff --git a/websocket/crates/services/src/manage_project_edit_session.rs b/websocket/crates/services/src/manage_project_edit_session.rs index 66e0634db..96d5eba15 100644 --- a/websocket/crates/services/src/manage_project_edit_session.rs +++ b/websocket/crates/services/src/manage_project_edit_session.rs @@ -1,3 +1,4 @@ +use super::SessionCommand; use chrono::Utc; use flow_websocket_infra::persistence::editing_session::ProjectEditingSession; use flow_websocket_infra::persistence::project_repository::ProjectRepositoryError; @@ -6,11 +7,8 @@ use flow_websocket_infra::persistence::repository::{ ProjectEditingSessionImpl, ProjectImpl, ProjectSnapshotImpl, RedisDataManagerImpl, WorkspaceImpl, }; -use flow_websocket_infra::types::project::Project; use flow_websocket_infra::types::user::User; -use flow_websocket_infra::types::workspace::Workspace; use mockall::automock; -use serde::{Deserialize, Serialize}; use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use tokio::sync::broadcast; @@ -41,60 +39,6 @@ where tasks: Arc>>, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum SessionCommand { - Start { - project_id: String, - user: User, - }, - End { - project_id: String, - user: User, - }, - Complete { - project_id: String, - user: User, - }, - CheckStatus { - project_id: String, - }, - AddTask { - project_id: String, - }, - RemoveTask { - project_id: String, - }, - ListAllSnapshotsVersions { - project_id: String, - }, - MergeUpdates { - project_id: String, - data: Vec, - updated_by: Option, - }, - CreateWorkspace { - workspace: Workspace, - }, - DeleteWorkspace { - workspace_id: String, - }, - UpdateWorkspace { - workspace: Workspace, - }, - ListWorkspaceProjectsIds { - workspace_id: String, - }, - CreateProject { - project: Project, - }, - DeleteProject { - project_id: String, - }, - UpdateProject { - project: Project, - }, -} - #[automock] impl ManageEditSessionService where @@ -147,148 +91,116 @@ where async fn handle_command( &self, result: Result, - ) -> Result<(), ProjectServiceError> { - debug!("Handling command: {:?}", result); - + ) -> Result>, ProjectServiceError> { match result { - Ok(command) => { - debug!("Processing command: {:?}", command); - match command { - SessionCommand::Start { project_id, user } => { - debug!("Starting session for project: {}, user: {:?}", project_id, user); - let result = self.handle_session_start(&project_id, user).await; - debug!("Session start result: {:?}", result); - result?; - } - SessionCommand::End { project_id, user } => { - debug!("Ending session for project: {}, user: {:?}", project_id, user); - let result = self.handle_session_end(&project_id, user).await; - debug!("Session end result: {:?}", result); - result?; - } - SessionCommand::Complete { project_id, user } => { - debug!("Completing session for project: {}, user: {:?}", project_id, user); - if let Some(mut session) = self.get_latest_session(&project_id).await? { - debug!("Found latest session: {:?}", session); - let result = self.complete_job_if_met_requirements(&mut session).await; - debug!("Job completion result: {:?}", result); - result?; - debug!( - "Job completed by user: {} for project: {}", - user.id, project_id - ); - } else { - debug!("No session found for project: {}", project_id); - } - } - SessionCommand::MergeUpdates { - project_id, - data, - updated_by, - } => { - debug!( - "Merging updates for project: {}, updated by: {:?}, data length: {}", - project_id, - updated_by, - data.len() - ); - let result = self.project_service - .merge_updates(&project_id, data, updated_by) - .await; - debug!("Merge updates result: {:?}", result); - result?; - } - SessionCommand::CheckStatus { project_id } => { - debug!("Checking session status for project: {}", project_id); - } - SessionCommand::AddTask { project_id } => { - debug!("Adding task for project: {}", project_id); - let result = self.add_task(&project_id).await; - debug!("Add task result: {:?}", result); - result?; - } - SessionCommand::RemoveTask { project_id } => { - debug!("Removing task for project: {}", project_id); - let result = self.remove_task(&project_id).await; - debug!("Remove task result: {:?}", result); - result?; - } - SessionCommand::ListAllSnapshotsVersions { project_id } => { - debug!("Listing snapshots versions for project: {}", project_id); - let versions = self - .project_service - .list_all_snapshots_versions(&project_id) - .await?; - debug!( - "Retrieved {} snapshots versions for project {}: {:?}", - versions.len(), - project_id, - versions - ); - } - // Workspace related commands - SessionCommand::CreateWorkspace { workspace } => { - debug!("Creating workspace: {:?}", workspace); - let result = self.project_service.create_workspace(workspace.clone()).await; - debug!("Create workspace result for {}: {:?}", workspace.id, result); - result?; - } - SessionCommand::DeleteWorkspace { workspace_id } => { - debug!("Deleting workspace: {}", workspace_id); - let result = self.project_service.delete_workspace(&workspace_id).await; - debug!("Delete workspace result: {:?}", result); - result?; - } - SessionCommand::UpdateWorkspace { workspace } => { - debug!("Updating workspace: {:?}", workspace); - let result = self.project_service.update_workspace(workspace.clone()).await; - debug!("Update workspace result for {}: {:?}", workspace.id, result); - result?; - } - SessionCommand::ListWorkspaceProjectsIds { workspace_id } => { - debug!("Listing projects for workspace: {}", workspace_id); - let projects = self - .project_service - .list_workspace_projects_ids(&workspace_id) - .await?; + Ok(command) => match command { + SessionCommand::Start { project_id, user } => { + self.handle_session_start(&project_id, user).await?; + Ok(None) + } + SessionCommand::End { project_id } => { + self.handle_session_end(&project_id).await?; + Ok(None) + } + SessionCommand::Complete { project_id, user } => { + if let Some(mut session) = self.get_latest_session(&project_id).await? { + self.complete_job_if_met_requirements(&mut session).await?; debug!( - "Retrieved {} projects for workspace {}: {:?}", - projects.len(), - workspace_id, - projects + "Job completed by user: {} for project: {}", + user.id, project_id ); } - // Project related commands - SessionCommand::CreateProject { project } => { - debug!("Creating project: {:?}", project); - let result = self.project_service.create_project(project.clone()).await; - debug!("Create project result for {}: {:?}", project.id, result); - result?; - } - SessionCommand::DeleteProject { project_id } => { - debug!("Deleting project: {}", project_id); - let result = self.project_service.delete_project(&project_id).await; - debug!("Delete project result: {:?}", result); - result?; - } - SessionCommand::UpdateProject { project } => { - debug!("Updating project: {:?}", project); - let result = self.project_service.update_project(project.clone()).await; - debug!("Update project result for {}: {:?}", project.id, result); - result?; - } + Ok(None) } - } + SessionCommand::MergeUpdates { + project_id, + data, + updated_by, + } => { + self.project_service + .merge_updates(&project_id, data, updated_by) + .await?; + Ok(None) + } + SessionCommand::ProcessStateVector { + project_id, + state_vector, + } => { + let updates = self + .project_service + .process_state_vector(&project_id, state_vector) + .await?; + debug!("Processed state vector for project: {}", project_id); + Ok(updates) + } + SessionCommand::CheckStatus { project_id } => { + debug!("Checking session status for project: {}", project_id); + Ok(None) + } + SessionCommand::AddTask { project_id } => { + self.add_task(&project_id).await?; + Ok(None) + } + SessionCommand::RemoveTask { project_id } => { + self.remove_task(&project_id).await?; + Ok(None) + } + SessionCommand::ListAllSnapshotsVersions { project_id } => { + let versions = self + .project_service + .list_all_snapshots_versions(&project_id) + .await?; + debug!( + "Snapshots versions for project {}: {:?}", + project_id, versions + ); + Ok(None) + } + // Workspace related commands + SessionCommand::CreateWorkspace { workspace } => { + self.project_service.create_workspace(workspace).await?; + Ok(None) + } + SessionCommand::DeleteWorkspace { workspace_id } => { + self.project_service.delete_workspace(&workspace_id).await?; + Ok(None) + } + SessionCommand::UpdateWorkspace { workspace } => { + self.project_service.update_workspace(workspace).await?; + Ok(None) + } + SessionCommand::ListWorkspaceProjectsIds { workspace_id } => { + let projects = self + .project_service + .list_workspace_projects_ids(&workspace_id) + .await?; + debug!("Projects for workspace {}: {:?}", workspace_id, projects); + Ok(None) + } + // Project related commands + SessionCommand::CreateProject { project } => { + self.project_service.create_project(project).await?; + Ok(None) + } + SessionCommand::DeleteProject { project_id } => { + self.project_service.delete_project(&project_id).await?; + Ok(None) + } + SessionCommand::UpdateProject { project } => { + self.project_service.update_project(project).await?; + Ok(None) + } + }, Err(broadcast::error::RecvError::Closed) => { - debug!("Command channel closed, waiting before retry"); + debug!("Command channel closed"); sleep(Duration::from_secs(1)).await; + Ok(None) } Err(broadcast::error::RecvError::Lagged(n)) => { - debug!("Receiver lagged behind by {} messages, some commands may have been dropped", n); + debug!("Receiver lagged behind by {} messages", n); + Ok(None) } } - debug!("Command handling completed successfully"); - Ok(()) } async fn check_tasks_conditions(&self) -> Result<(), ProjectServiceError> { @@ -328,11 +240,7 @@ where Ok(()) } - async fn handle_session_end( - &self, - project_id: &str, - _user: User, - ) -> Result<(), ProjectServiceError> { + async fn handle_session_end(&self, project_id: &str) -> Result<(), ProjectServiceError> { if let Some(task_data) = self.get_task_data(project_id).await { self.update_client_count(&task_data, false).await; diff --git a/websocket/crates/services/src/project.rs b/websocket/crates/services/src/project.rs index 234b39532..70cf5954c 100644 --- a/websocket/crates/services/src/project.rs +++ b/websocket/crates/services/src/project.rs @@ -102,6 +102,18 @@ where Ok(()) } + pub async fn process_state_vector( + &self, + project_id: &str, + state_vector: Vec, + ) -> Result>, ProjectServiceError> { + let ret = self + .redis_data_manager + .process_state_vector(project_id, state_vector) + .await?; + Ok(ret) + } + pub async fn get_or_create_editing_session( &self, project_id: &str, diff --git a/websocket/crates/services/src/types.rs b/websocket/crates/services/src/types.rs index d902be970..4be2ce9f6 100644 --- a/websocket/crates/services/src/types.rs +++ b/websocket/crates/services/src/types.rs @@ -1,7 +1,67 @@ use chrono::{DateTime, Utc}; +use flow_websocket_infra::types::{project::Project, user::User, workspace::Workspace}; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "tag", content = "content")] +pub enum SessionCommand { + Start { + project_id: String, + user: User, + }, + End { + project_id: String, + }, + Complete { + project_id: String, + user: User, + }, + CheckStatus { + project_id: String, + }, + AddTask { + project_id: String, + }, + RemoveTask { + project_id: String, + }, + ListAllSnapshotsVersions { + project_id: String, + }, + MergeUpdates { + project_id: String, + data: Vec, + updated_by: Option, + }, + ProcessStateVector { + project_id: String, + state_vector: Vec, + }, + CreateWorkspace { + workspace: Workspace, + }, + DeleteWorkspace { + workspace_id: String, + }, + UpdateWorkspace { + workspace: Workspace, + }, + ListWorkspaceProjectsIds { + workspace_id: String, + }, + CreateProject { + project: Project, + }, + DeleteProject { + project_id: String, + }, + UpdateProject { + project: Project, + }, +} + #[derive(Clone, Debug)] pub struct ManageProjectEditSessionTaskData { pub project_id: String, From 53ea438514a24d55e0c2318cdb025384e8a1292a Mon Sep 17 00:00:00 2001 From: xy Date: Wed, 27 Nov 2024 02:15:12 +0900 Subject: [PATCH 13/17] Add data log --- ui/src/lib/yjs/socketYjsManager.ts | 64 ++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index 0608d6ee0..fbabdfca4 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -226,9 +226,24 @@ export class SocketYjsManager { const update = data.update instanceof ArrayBuffer ? new Uint8Array(data.update) : data.update; + + console.log("Received peer update:", { + updateLength: update.length, + rawUpdate: Array.from(update) + }); const currentState = Y.encodeStateAsUpdateV2(this.doc); + console.log("Current state before diff:", { + stateLength: currentState.length, + rawState: Array.from(currentState) + }); + const diffUpdate = Y.diffUpdateV2(update, currentState); + console.log("Diff update:", { + diffLength: diffUpdate.length, + rawDiff: Array.from(diffUpdate) + }); + Y.applyUpdateV2(this.doc, diffUpdate, 'peer'); this.onUpdateHandlers.forEach((handler) => handler(update)); } @@ -237,10 +252,24 @@ export class SocketYjsManager { await this.isReady(); const currentState = Y.encodeStateAsUpdateV2(this.doc); + console.log("Current state:", { + stateLength: currentState.length, + rawState: Array.from(currentState) + }); + const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); + console.log("State vector:", { + vectorLength: stateVector.length, + rawVector: Array.from(stateVector) + }); if (this.ws.readyState === WebSocket.OPEN) { const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); + console.log("Sending sync message:", { + messageLength: syncMessage.length, + messageType: syncMessage[0], + rawMessage: Array.from(syncMessage) + }); this.ws.send(syncMessage); } @@ -276,7 +305,21 @@ export class SocketYjsManager { protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { + console.log("Sending doc update:", { + type: "UPDATE", + updateLength: update.length, + rawUpdate: Array.from(update), // 转换为普通数组以便查看 + origin, + clientId: this.doc.clientID + }); + const updateMessage = createBinaryMessage(MessageType.UPDATE, update); + console.log("Final binary message:", { + messageLength: updateMessage.length, + messageType: updateMessage[0], + rawMessage: Array.from(updateMessage) + }); + this.ws.send(updateMessage); } } @@ -331,8 +374,21 @@ export class SocketYjsManager { } function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { - const message = new Uint8Array(data.length + 1); - message[0] = type; - message.set(data, 1); - return message; + console.log("Creating binary message:", { + type, + dataLength: data.length, + rawData: Array.from(data) + }); + + const message = new Uint8Array(data.length + 1); + message[0] = type; + message.set(data, 1); + + console.log("Created message:", { + messageLength: message.length, + messageType: message[0], + rawMessage: Array.from(message) + }); + + return message; } From 06f22682b2896e0caa144a2c2a14411cfc51fa3e Mon Sep 17 00:00:00 2001 From: xy Date: Wed, 27 Nov 2024 03:16:51 +0900 Subject: [PATCH 14/17] Encode data before send --- ui/src/lib/yjs/socketYjsManager.ts | 41 +++++++++--------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index fbabdfca4..af00f8c55 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -305,22 +305,18 @@ export class SocketYjsManager { protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { - console.log("Sending doc update:", { - type: "UPDATE", - updateLength: update.length, - rawUpdate: Array.from(update), // 转换为普通数组以便查看 - origin, - clientId: this.doc.clientID - }); - - const updateMessage = createBinaryMessage(MessageType.UPDATE, update); - console.log("Final binary message:", { - messageLength: updateMessage.length, - messageType: updateMessage[0], - rawMessage: Array.from(updateMessage) - }); - - this.ws.send(updateMessage); + console.log("Received doc update:", { + updateLength: update.length, + rawUpdate: Array.from(update) + }); + + const updateMessage = createBinaryMessage(MessageType.UPDATE, update); + console.log("Sending update message:", { + messageLength: updateMessage.length, + messageType: updateMessage[0] + }); + + this.ws.send(updateMessage); } } @@ -374,21 +370,8 @@ export class SocketYjsManager { } function createBinaryMessage(type: MessageType, data: Uint8Array): Uint8Array { - console.log("Creating binary message:", { - type, - dataLength: data.length, - rawData: Array.from(data) - }); - const message = new Uint8Array(data.length + 1); message[0] = type; message.set(data, 1); - - console.log("Created message:", { - messageLength: message.length, - messageType: message[0], - rawMessage: Array.from(message) - }); - return message; } From d452e3ea858ce12de61b43640b754a93c8e0c76a Mon Sep 17 00:00:00 2001 From: xy Date: Wed, 27 Nov 2024 23:29:41 +0900 Subject: [PATCH 15/17] Fix encode issue --- ui/src/lib/yjs/socketYjsManager.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index af00f8c55..cda7ecf00 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -111,7 +111,7 @@ export class SocketYjsManager { } private setupDocListeners() { - this.doc.on("update", this.onDocUpdate); + this.doc.on("updateV2", this.onDocUpdate); } protected onConnectionEstablished() { @@ -305,6 +305,7 @@ export class SocketYjsManager { protected onDocUpdate(update: Uint8Array, origin: unknown) { if (origin === this.doc.clientID && this.ws.readyState === WebSocket.OPEN) { + console.log("Received doc update from self doc:", this.doc); console.log("Received doc update:", { updateLength: update.length, rawUpdate: Array.from(update) @@ -315,6 +316,9 @@ export class SocketYjsManager { messageLength: updateMessage.length, messageType: updateMessage[0] }); + console.log("Raw update message:", { + rawMessage: Array.from(updateMessage) + }); this.ws.send(updateMessage); } From 247877e74d50122c4d34e586a884b3a0f80124ff Mon Sep 17 00:00:00 2001 From: xy Date: Thu, 28 Nov 2024 00:25:38 +0900 Subject: [PATCH 16/17] Fix sync encode --- ui/src/lib/yjs/socketYjsManager.ts | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index cda7ecf00..b012dc927 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -247,35 +247,28 @@ export class SocketYjsManager { Y.applyUpdateV2(this.doc, diffUpdate, 'peer'); this.onUpdateHandlers.forEach((handler) => handler(update)); } - async syncData() { await this.isReady(); - const currentState = Y.encodeStateAsUpdateV2(this.doc); - console.log("Current state:", { - stateLength: currentState.length, - rawState: Array.from(currentState) - }); - - const stateVector = Y.encodeStateVectorFromUpdateV2(currentState); + const stateVector = Y.encodeStateAsUpdateV2(this.doc); console.log("State vector:", { vectorLength: stateVector.length, rawVector: Array.from(stateVector) }); if (this.ws.readyState === WebSocket.OPEN) { - const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); - console.log("Sending sync message:", { - messageLength: syncMessage.length, - messageType: syncMessage[0], - rawMessage: Array.from(syncMessage) - }); - this.ws.send(syncMessage); + const syncMessage = createBinaryMessage(MessageType.SYNC, stateVector); + console.log("Sending sync message:", { + messageLength: syncMessage.length, + messageType: syncMessage[0], + rawMessage: Array.from(syncMessage) + }); + this.ws.send(syncMessage); } if (!this.firstSyncComplete) { - this.firstSyncComplete = true; - queueMicrotask(() => this.syncData()); + this.firstSyncComplete = true; + queueMicrotask(() => this.syncData()); } } From 3940df93a7170eb8267bfb56e7d48c0fcfc95f14 Mon Sep 17 00:00:00 2001 From: xy Date: Thu, 28 Nov 2024 00:58:26 +0900 Subject: [PATCH 17/17] Update sync data encode --- ui/src/lib/yjs/socketYjsManager.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ui/src/lib/yjs/socketYjsManager.ts b/ui/src/lib/yjs/socketYjsManager.ts index b012dc927..b66ba3b48 100644 --- a/ui/src/lib/yjs/socketYjsManager.ts +++ b/ui/src/lib/yjs/socketYjsManager.ts @@ -247,10 +247,11 @@ export class SocketYjsManager { Y.applyUpdateV2(this.doc, diffUpdate, 'peer'); this.onUpdateHandlers.forEach((handler) => handler(update)); } + async syncData() { await this.isReady(); - const stateVector = Y.encodeStateAsUpdateV2(this.doc); + const stateVector = Y.encodeStateVector(this.doc); console.log("State vector:", { vectorLength: stateVector.length, rawVector: Array.from(stateVector)