diff --git a/src/edge.js b/src/edge.js index fe9c1fb..b41c318 100644 --- a/src/edge.js +++ b/src/edge.js @@ -9,7 +9,12 @@ * OF ANY KIND, either express or implied. See the License for the specific language * governing permissions and limitations under the License. */ -import { invalidateFromAdmin, setupWSConnection } from './shareddoc.js'; +import { + invalidateFromAdmin, + setupWSConnection, + createYDoc, + setupYDoc, +} from './shareddoc.js'; // This is the Edge Worker, built using Durable Objects! @@ -219,11 +224,13 @@ export default { // connect to the room using WebSockets, and the room broadcasts messages from each participant // to all others. export class DocRoom { - constructor(controller, env) { + constructor(controller, env, docs = new Map()) { // `controller.storage` provides access to our durable storage. It provides a simple KV // get()/put() interface. this.storage = controller.storage; + this.docs = docs; + // `env` is our environment bindings (discussed earlier). this.env = env; this.id = controller?.id?.toString() || `no-controller-${new Date().getTime()}`; @@ -241,15 +248,17 @@ export class DocRoom { const baseURL = request.url.substring(0, qidx); const api = url.searchParams.get('api'); + // eslint-disable-next-line no-console + console.log('API Call received', api, baseURL); switch (api) { case 'deleteAdmin': - if (await invalidateFromAdmin(baseURL)) { + if (await invalidateFromAdmin(this.docs.get(baseURL))) { return new Response(null, { status: 204 }); } else { return new Response('Not Found', { status: 404 }); } case 'syncAdmin': - if (await invalidateFromAdmin(baseURL)) { + if (await invalidateFromAdmin(this.docs.get(baseURL))) { return new Response('OK', { status: 200 }); } else { return new Response('Not Found', { status: 404 }); @@ -350,7 +359,30 @@ export class DocRoom { // eslint-disable-next-line no-console console.log('DocRoom setting up WSConnection for document with id', docName, this.id); - const timingData = await setupWSConnection(webSocket, docName, this.env, this.storage); + const timingData = new Map(); + + let ydoc = this.docs.get(docName); + if (!ydoc) { + // eslint-disable-next-line no-console + console.log('Document not found in cache', docName); + ydoc = createYDoc( + docName, + webSocket, + this.env, + this.storage, + this.docs, + true, + ); + this.docs.set(docName, ydoc); + } else { + // eslint-disable-next-line no-console + console.log('Document found in cache', docName); + } + + await setupYDoc(ydoc, webSocket, timingData); + + await setupWSConnection(webSocket, ydoc); + return timingData; } } diff --git a/src/shareddoc.js b/src/shareddoc.js index b7c67d2..1d1bea7 100644 --- a/src/shareddoc.js +++ b/src/shareddoc.js @@ -24,9 +24,6 @@ const wsReadyStateOpen = 1; // disable gc when using snapshots! const gcEnabled = false; -// The local cache of ydocs -const docs = new Map(); - const messageSync = 0; const messageAwareness = 1; const MAX_STORAGE_KEYS = 128; @@ -39,25 +36,26 @@ const MAX_STORAGE_VALUE_SIZE = 131072; * @param {WebSocket} conn - the websocket connection to close. */ export const closeConn = (doc, conn) => { - // eslint-disable-next-line no-console - console.log('Closing connection', doc.name, doc.conns.size); - if (doc.conns.has(conn)) { - const controlledIds = doc.conns.get(conn); - doc.conns.delete(conn); - try { + try { + // eslint-disable-next-line no-console + console.log('Closing connection', doc.name, doc.conns.size); + if (doc.conns.has(conn)) { + const controlledIds = doc.conns.get(conn); + doc.conns.delete(conn); awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null); - } catch (err) { - // eslint-disable-next-line no-console - console.error('Error removing awareness states', err); } - if (doc.conns.size === 0) { + if (doc.onClose && doc.conns.size === 0) { // eslint-disable-next-line no-console - console.log('No connections left, removing document from local map', doc.name); - docs.delete(doc.name); + console.log(`All connections closed for ${doc.name} - removing from docs cache`); + doc.onClose(); } + + conn.close(); + } catch (e) { + // eslint-disable-next-line no-console + console.error('Could not properly close the connection', e); } - conn.close(); }; const send = (doc, conn, m) => { @@ -181,7 +179,7 @@ export const showError = (ydoc, err) => { }; export const persistence = { - closeConn: closeConn.bind(this), + closeConn, /** * Get the document from da-admin. If da-admin doesn't have the doc, a new empty doc is @@ -295,7 +293,7 @@ export const persistence = { * @param {WebSocket} conn - the websocket connection * @param {TransactionalStorage} storage - the worker transactional storage object */ - bindState: async (docName, ydoc, conn, storage) => { + bindState: async (docName, ydoc, conn, storage, docsCache) => { let timingReadStateDuration; let timingDaAdminGetDuration; @@ -354,7 +352,7 @@ export const persistence = { // eslint-disable-next-line no-console console.log('Could not be restored, trying to restore from da-admin', docName); setTimeout(() => { - if (ydoc === docs.get(docName)) { + if (ydoc === docsCache.get(docName)) { const rootType = ydoc.getXmlFragment('prosemirror'); ydoc.transact(() => { try { @@ -377,7 +375,7 @@ export const persistence = { ydoc.on('update', async () => { // Whenever we receive an update on the document store it in the local storage - if (ydoc === docs.get(docName)) { // make sure this ydoc is still active + if (ydoc === docsCache.get(docName)) { // make sure this ydoc is still active storeState(docName, Y.encodeStateAsUpdate(ydoc), storage); } }); @@ -385,7 +383,7 @@ export const persistence = { ydoc.on('update', debounce(async () => { // If we receive an update on the document, store it in da-admin, but debounce it // to avoid excessive da-admin calls. - if (ydoc === docs.get(docName)) { + if (ydoc === docsCache.get(docName)) { current = await persistence.update(ydoc, current); } }, 2000, { maxWait: 10000 })); @@ -453,25 +451,25 @@ export class WSSharedDoc extends Y.Doc { * @param {boolean} gc - whether garbage collection is enabled * @returns The Yjs document object, which may be shared across multiple sockets. */ -export const getYDoc = async (docname, conn, env, storage, timingData, gc = true) => { - let doc = docs.get(docname); - if (doc === undefined) { - // The doc is not yet in the cache, create a new one. - doc = new WSSharedDoc(docname); - doc.gc = gc; - docs.set(docname, doc); - } - - if (!doc.conns.get(conn)) { - doc.conns.set(conn, new Set()); - } +export const createYDoc = (docname, conn, env, storage, docsCache, gc = true) => { + const doc = new WSSharedDoc(docname); + doc.gc = gc; // Store the service binding to da-admin which we receive through the environment in the doc doc.daadmin = env.daadmin; - if (!doc.promise) { - // The doc is not yet bound to the persistence layer, do so now. The promise will be resolved - // when bound. - doc.promise = persistence.bindState(docname, doc, conn, storage); + doc.promise = persistence.bindState(docname, doc, conn, storage, docsCache); + doc.onClose = () => { + // eslint-disable-next-line no-console + console.log('Document on close - remove from docs cache', docname); + docsCache.delete(docname); + }; + + return doc; +}; + +export const setupYDoc = async (doc, conn, timingData) => { + if (!doc.conns.get(conn)) { + doc.conns.set(conn, new Set()); } // We wait for the promise, for second and subsequent connections to the same doc, this will @@ -480,12 +478,8 @@ export const getYDoc = async (docname, conn, env, storage, timingData, gc = true if (timingData) { timings.forEach((v, k) => timingData.set(k, v)); } - return doc; }; -// For testing -export const setYDoc = (docname, ydoc) => docs.set(docname, ydoc); - // This read sync message handles readonly connections const readSyncMessage = (decoder, encoder, doc, readOnly, transactionOrigin) => { const messageType = decoding.readVarUint(decoder); @@ -539,7 +533,7 @@ export const messageListener = (conn, doc, message) => { // eslint-disable-next-line no-console, no-nested-ternary console.error('messageListener - Message', err.message); // eslint-disable-next-line no-console - console.error('Error in messageListener', err); + console.error(`Error in messageListener ${doc?.name} - messageType: ${messageType}`, err); showError(doc, err); } }; @@ -552,18 +546,18 @@ export const messageListener = (conn, doc, message) => { * @param {string} docName - The name of the document * @returns true if the document was found and invalidated, false otherwise. */ -export const invalidateFromAdmin = async (docName) => { - // eslint-disable-next-line no-console - console.log('Invalidate from Admin received', docName); - const ydoc = docs.get(docName); +export const invalidateFromAdmin = async (ydoc) => { if (ydoc) { + // eslint-disable-next-line no-console + console.log('Invalidating document', ydoc.name); + // As we are closing all connections, the ydoc will be removed from the docs map ydoc.conns.forEach((_, c) => closeConn(ydoc, c)); return true; } else { // eslint-disable-next-line no-console - console.log('Document not found', docName); + console.log('No document to invalidate'); } return false; }; @@ -576,20 +570,16 @@ export const invalidateFromAdmin = async (docName) => { * @param {TransactionalStorage} storage - The worker transactional storage object * @returns {Promise} - The return value of this */ -export const setupWSConnection = async (conn, docName, env, storage) => { - const timingData = new Map(); - +export const setupWSConnection = async (conn, ydoc) => { // eslint-disable-next-line no-param-reassign conn.binaryType = 'arraybuffer'; - // get doc, initialize if it does not exist yet - const doc = await getYDoc(docName, conn, env, storage, timingData, true); // listen and reply to events - conn.addEventListener('message', (message) => messageListener(conn, doc, new Uint8Array(message.data))); + conn.addEventListener('message', (message) => messageListener(conn, ydoc, new Uint8Array(message.data))); // Check if connection is still alive conn.addEventListener('close', () => { - closeConn(doc, conn); + closeConn(ydoc, conn); }); // put the following in a variables in a block so the interval handlers don't keep in in // scope @@ -597,20 +587,18 @@ export const setupWSConnection = async (conn, docName, env, storage) => { // send sync step 1 let encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageSync); - syncProtocol.writeSyncStep1(encoder, doc); - send(doc, conn, encoding.toUint8Array(encoder)); - const awarenessStates = doc.awareness.getStates(); + syncProtocol.writeSyncStep1(encoder, ydoc); + send(ydoc, conn, encoding.toUint8Array(encoder)); + const awarenessStates = ydoc.awareness.getStates(); if (awarenessStates.size > 0) { encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageAwareness); encoding.writeVarUint8Array(encoder, awarenessProtocol - .encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys()))); - send(doc, conn, encoding.toUint8Array(encoder)); + .encodeAwarenessUpdate(ydoc.awareness, Array.from(awarenessStates.keys()))); + send(ydoc, conn, encoding.toUint8Array(encoder)); } } catch (err) { // eslint-disable-next-line no-console console.error('Error in setupWSConnection', err); } - - return timingData; }; diff --git a/test/edge.test.js b/test/edge.test.js index 861815c..9376918 100644 --- a/test/edge.test.js +++ b/test/edge.test.js @@ -13,7 +13,7 @@ import assert from 'assert'; import * as Y from 'yjs'; import defaultEdge, { DocRoom, handleApiRequest, handleErrors } from '../src/edge.js'; -import { WSSharedDoc, persistence, setYDoc } from '../src/shareddoc.js'; +import { WSSharedDoc, persistence } from '../src/shareddoc.js'; import { doc2aem } from '../src/collab.js'; function hash(str) { @@ -138,7 +138,6 @@ describe('Worker test suite', () => { it('Docroom deleteFromAdmin', async () => { const ydocName = 'http://foobar.com/q.html'; const testYdoc = new WSSharedDoc(ydocName); - const m = setYDoc(ydocName, testYdoc); const connCalled = [] const mockConn = { @@ -150,12 +149,10 @@ describe('Worker test suite', () => { url: `${ydocName}?api=deleteAdmin` }; - const dr = new DocRoom({}); + const dr = new DocRoom({}, {}, new Map([[ydocName, testYdoc]])); - assert(m.has(ydocName), 'Precondition'); const resp = await dr.fetch(req) assert.equal(204, resp.status); - assert(!m.has(ydocName), 'Doc should have been removed'); assert.deepStrictEqual(['close'], connCalled); }); @@ -172,8 +169,7 @@ describe('Worker test suite', () => { it('Docroom syncFromAdmin', async () => { const ydocName = 'http://foobar.com/a/b/c.html'; const testYdoc = new WSSharedDoc(ydocName); - const m = setYDoc(ydocName, testYdoc); - + const connCalled = [] const mockConn = { close() { connCalled.push('close'); } @@ -184,12 +180,10 @@ describe('Worker test suite', () => { url: `${ydocName}?api=syncAdmin` }; - const dr = new DocRoom({}); + const dr = new DocRoom({}, {}, new Map([[ydocName, testYdoc]])); - assert(m.has(ydocName), 'Precondition'); const resp = await dr.fetch(req) assert.equal(200, resp.status); - assert(!m.has(ydocName), 'Doc should have been removed'); assert.deepStrictEqual(['close'], connCalled); }); @@ -255,14 +249,14 @@ describe('Worker test suite', () => { assert.equal('au123', wsp1.auth); const acceptIdx = wspCalled.indexOf('accept'); - const alMessIdx = wspCalled.indexOf('addEventListener message'); - const alClsIdx = wspCalled.indexOf('addEventListener close'); + const alMessCount = wspCalled.filter(call => call === 'addEventListener message').length; + const alClsCount = wspCalled.filter(call => call === 'addEventListener close').length; const clsIdx = wspCalled.indexOf('close'); - assert(acceptIdx >= 0); - assert(alMessIdx > acceptIdx); - assert(alClsIdx > alMessIdx); - assert(clsIdx > alClsIdx); + assert(acceptIdx === 0); + assert(alMessCount === 1); + assert(alClsCount === 1); + assert(clsIdx === wspCalled.length - 1); } finally { DocRoom.newWebSocketPair = savedNWSP; persistence.bindState = savedBS; diff --git a/test/shareddoc.test.js b/test/shareddoc.test.js index 97ca695..c9e5486 100644 --- a/test/shareddoc.test.js +++ b/test/shareddoc.test.js @@ -14,8 +14,8 @@ import assert from 'assert'; import esmock from 'esmock'; import { - closeConn, getYDoc, invalidateFromAdmin, messageListener, persistence, - readState, setupWSConnection, setYDoc, showError, storeState, updateHandler, WSSharedDoc, + closeConn, createYDoc, invalidateFromAdmin, messageListener, persistence, + readState, setupWSConnection, showError, storeState, updateHandler, WSSharedDoc, } from '../src/shareddoc.js'; import { aem2doc, doc2aem, EMPTY_DOC } from '../src/collab.js'; @@ -388,11 +388,7 @@ describe('Collab Test Suite', () => { const testYDoc = new WSSharedDoc(docName); testYDoc.conns = conns; - const m = setYDoc(docName, testYDoc); - - assert(m.has(docName), 'Precondition'); - invalidateFromAdmin(docName); - assert(!m.has(docName), 'Document should have been removed from global map'); + invalidateFromAdmin(testYDoc); const res1 = ['close1', 'close2']; const res2 = ['close2', 'close1']; @@ -411,7 +407,6 @@ describe('Collab Test Suite', () => { conns: new Map(), }; mockDoc.awareness.states.set('123', null); - const docs = setYDoc(mockDoc.name, mockDoc); const called = []; const mockConn = { @@ -422,17 +417,11 @@ describe('Collab Test Suite', () => { mockDoc.conns.set(mockConn, ids); assert.equal(0, called.length, 'Precondition'); - assert(docs.get(mockDoc.name), 'Precondition'); closeConn(mockDoc, mockConn); assert.deepStrictEqual(['close'], called); assert.equal(0, mockDoc.conns.size); assert.deepStrictEqual(['123'], awarenessEmitted[0][0].removed, 'removeAwarenessStates should be called'); - - assert.equal(docs.get(mockDoc.name), undefined, - 'Document should be removed from global map'); - - assert(docs.get(mockDoc.name) === undefined, 'Should have been removed from docs map'); }); it('Test close unknown connection', async () => { @@ -467,7 +456,6 @@ describe('Collab Test Suite', () => { auth: 'myauth', authActions: ['read'] }; - pss.setYDoc(docName, testYDoc); const mockStorage = { list: () => new Map() }; @@ -476,7 +464,9 @@ describe('Collab Test Suite', () => { pss.persistence.update = async (d, v) => updated.set(d, v); assert.equal(0, updated.size, 'Precondition'); - await pss.persistence.bindState(docName, testYDoc, mockConn, mockStorage); + const docs = new Map(); + docs.set(docName, testYDoc); + await pss.persistence.bindState(docName, testYDoc, mockConn, mockStorage, docs); assert.equal(0, aem2DocCalled.length, 'Precondition, it\'s important to handle the doc setting async'); @@ -502,7 +492,6 @@ describe('Collab Test Suite', () => { const ydocUpdateCB = []; const testYDoc = new Y.Doc(); testYDoc.on = (ev, f) => { if (ev === 'update') ydocUpdateCB.push(f); } - pss.setYDoc(docName, testYDoc); const called = [] const mockStorage = { @@ -520,7 +509,9 @@ describe('Collab Test Suite', () => { try { globalThis.setTimeout = () => setTimeoutCalls.push('setTimeout'); - await pss.persistence.bindState(docName, testYDoc, {}, mockStorage); + const docs = new Map(); + docs.set(docName, testYDoc); + await pss.persistence.bindState(docName, testYDoc, {}, mockStorage, docs); } finally { globalThis.setTimeout = savedSetTimeout; } @@ -578,7 +569,6 @@ describe('Collab Test Suite', () => { it('Test bindstate falls back to daadmin on worker storage error', async () => { const docName = 'https://admin.da.live/source/foo/bar.html'; const ydoc = new Y.Doc(); - setYDoc(docName, ydoc); const storage = { list: async () => { throw new Error('yikes') } }; @@ -593,7 +583,9 @@ describe('Collab Test Suite', () => {
From daadmin
`; - await persistence.bindState(docName, ydoc, {}, storage); + const docs = new Map(); + docs.set(docName, ydoc); + await persistence.bindState(docName, ydoc, {}, storage, docs); assert(doc2aem(ydoc).includes('

From daadmin

')); } finally { @@ -620,7 +612,6 @@ describe('Collab Test Suite', () => { updObservers.push(fun); } }; - pss.setYDoc(docName, ydoc); const savedSetTimeout = globalThis.setTimeout; const savedGet = pss.persistence.get; @@ -641,7 +632,9 @@ describe('Collab Test Suite', () => { } }; - await pss.persistence.bindState(docName, ydoc, {}, storage); + const docs = new Map(); + docs.set(docName, ydoc); + await pss.persistence.bindState(docName, ydoc, {}, storage, docs); aem2doc('
newcontent
', ydoc); @@ -674,7 +667,6 @@ describe('Collab Test Suite', () => { const ydoc = new Y.Doc(); ydoc.daadmin = serviceBinding; - setYDoc(docName, ydoc); const conn = {}; const storage = { deleteAll: async () => {}, @@ -711,7 +703,6 @@ describe('Collab Test Suite', () => { const ydoc = new Y.Doc(); ydoc.daadmin = serviceBinding; - setYDoc(docName, ydoc); const conn = {}; const deleteAllCalled = []; @@ -734,7 +725,10 @@ describe('Collab Test Suite', () => { f(); }; - await persistence.bindState(docName, ydoc, conn, storage); + const docs = new Map(); + docs.set(docName, ydoc); + + await persistence.bindState(docName, ydoc, conn, storage, docs); assert.deepStrictEqual([true], deleteAllCalled); assert.equal(1, setTimeoutCalled.length, 'SetTimeout should have been called to update the doc'); } finally { @@ -753,7 +747,6 @@ describe('Collab Test Suite', () => { updObservers.push(fun); } }; - setYDoc(docName, ydoc); const conn = {}; const called = []; @@ -773,7 +766,10 @@ describe('Collab Test Suite', () => { }; persistence.get = async () => '
myinitial
'; - await persistence.bindState(docName, ydoc, conn, storage); + const docs = new Map(); + docs.set(docName, ydoc); + + await persistence.bindState(docName, ydoc, conn, storage, docs); assert(doc2aem(ydoc).includes('myinitial')); assert.equal(2, updObservers.length); @@ -796,7 +792,7 @@ describe('Collab Test Suite', () => { } }); - it('Test getYDoc', async () => { + it('Test createYDoc', async () => { const savedBS = persistence.bindState; try { @@ -809,7 +805,7 @@ describe('Collab Test Suite', () => { const mockConn = {}; assert.equal(0, bsCalls.length, 'Precondition'); - const doc = await getYDoc(docName, mockConn, {}, {}); + const doc = await createYDoc(docName, mockConn, {}, {}); assert.equal(1, bsCalls.length); assert.equal(bsCalls[0].dn, docName); assert.equal(bsCalls[0].d, doc); @@ -817,10 +813,8 @@ describe('Collab Test Suite', () => { const daadmin = { foo: 'bar' } const env = { daadmin }; - const doc2 = await getYDoc(docName, mockConn, env, {}); - assert.equal(1, bsCalls.length, 'Should not have called bindstate again'); - assert.equal(doc, doc2); - assert.equal('bar', doc.daadmin.foo, 'Should have bound daadmin now'); + const doc2 = await createYDoc(docName, mockConn, env, {}); + assert.equal('bar', doc2.daadmin.foo, 'Should have bound daadmin now'); } finally { persistence.bindState = savedBS; } @@ -904,7 +898,9 @@ describe('Collab Test Suite', () => { assert.equal(0, bindCalls.length, 'Precondition'); assert.equal(0, eventListeners.size, 'Precondition'); - await setupWSConnection(mockConn, docName, env, storage); + + const ydoc = await createYDoc(docName, mockConn, env, storage, new Map(), new Map(), true); + await setupWSConnection(mockConn, ydoc); assert.equal('arraybuffer', mockConn.binaryType); assert.equal(1, bindCalls.length); @@ -951,10 +947,10 @@ describe('Collab Test Suite', () => { states: awarenessStates }; - const ydoc = await getYDoc(docName, mockConn, {}, {}, true); + const ydoc = await createYDoc(docName, mockConn, {}, {}, new Map(), new Map(), true); ydoc.awareness = awareness; - await setupWSConnection(mockConn, docName, {}, {}); + await setupWSConnection(mockConn, ydoc); assert.equal(0, closeCalls.length); assert.equal(2, sendCalls.length);