Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions src/edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import { invalidateFromAdmin, setupWSConnection } from './shareddoc.js';
import {
invalidateFromAdmin,
setupWSConnection,
createYDoc,
setupYDoc,
} from './shareddoc.js';

// This is the Edge Worker, built using Durable Objects!

Expand Down Expand Up @@ -219,11 +224,13 @@ export default {
// connect to the room using WebSockets, and the room broadcasts messages from each participant
// to all others.
export class DocRoom {
constructor(controller, env) {
constructor(controller, env, docs = new Map()) {
// `controller.storage` provides access to our durable storage. It provides a simple KV
// get()/put() interface.
this.storage = controller.storage;

this.docs = docs;

// `env` is our environment bindings (discussed earlier).
this.env = env;
this.id = controller?.id?.toString() || `no-controller-${new Date().getTime()}`;
Expand All @@ -241,15 +248,17 @@ export class DocRoom {
const baseURL = request.url.substring(0, qidx);

const api = url.searchParams.get('api');
// eslint-disable-next-line no-console
console.log('API Call received', api, baseURL);
switch (api) {
case 'deleteAdmin':
if (await invalidateFromAdmin(baseURL)) {
if (await invalidateFromAdmin(this.docs.get(baseURL))) {
return new Response(null, { status: 204 });
} else {
return new Response('Not Found', { status: 404 });
}
case 'syncAdmin':
if (await invalidateFromAdmin(baseURL)) {
if (await invalidateFromAdmin(this.docs.get(baseURL))) {
return new Response('OK', { status: 200 });
} else {
return new Response('Not Found', { status: 404 });
Expand Down Expand Up @@ -350,7 +359,30 @@ export class DocRoom {
// eslint-disable-next-line no-console
console.log('DocRoom setting up WSConnection for document with id', docName, this.id);

const timingData = await setupWSConnection(webSocket, docName, this.env, this.storage);
const timingData = new Map();

let ydoc = this.docs.get(docName);
if (!ydoc) {
// eslint-disable-next-line no-console
console.log('Document not found in cache', docName);
ydoc = createYDoc(
docName,
webSocket,
this.env,
this.storage,
this.docs,
true,
);
this.docs.set(docName, ydoc);
} else {
// eslint-disable-next-line no-console
console.log('Document found in cache', docName);
}

await setupYDoc(ydoc, webSocket, timingData);

await setupWSConnection(webSocket, ydoc);

return timingData;
}
}
110 changes: 49 additions & 61 deletions src/shareddoc.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ const wsReadyStateOpen = 1;
// disable gc when using snapshots!
const gcEnabled = false;

// The local cache of ydocs
const docs = new Map();

const messageSync = 0;
const messageAwareness = 1;
const MAX_STORAGE_KEYS = 128;
Expand All @@ -39,25 +36,26 @@ const MAX_STORAGE_VALUE_SIZE = 131072;
* @param {WebSocket} conn - the websocket connection to close.
*/
export const closeConn = (doc, conn) => {
// eslint-disable-next-line no-console
console.log('Closing connection', doc.name, doc.conns.size);
if (doc.conns.has(conn)) {
const controlledIds = doc.conns.get(conn);
doc.conns.delete(conn);
try {
try {
// eslint-disable-next-line no-console
console.log('Closing connection', doc.name, doc.conns.size);
if (doc.conns.has(conn)) {
const controlledIds = doc.conns.get(conn);
doc.conns.delete(conn);
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null);
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error removing awareness states', err);
}

if (doc.conns.size === 0) {
if (doc.onClose && doc.conns.size === 0) {
// eslint-disable-next-line no-console
console.log('No connections left, removing document from local map', doc.name);
docs.delete(doc.name);
console.log(`All connections closed for ${doc.name} - removing from docs cache`);
doc.onClose();
}

conn.close();
} catch (e) {
// eslint-disable-next-line no-console
console.error('Could not properly close the connection', e);
}
conn.close();
};

const send = (doc, conn, m) => {
Expand Down Expand Up @@ -181,7 +179,7 @@ export const showError = (ydoc, err) => {
};

export const persistence = {
closeConn: closeConn.bind(this),
closeConn,

/**
* Get the document from da-admin. If da-admin doesn't have the doc, a new empty doc is
Expand Down Expand Up @@ -295,7 +293,7 @@ export const persistence = {
* @param {WebSocket} conn - the websocket connection
* @param {TransactionalStorage} storage - the worker transactional storage object
*/
bindState: async (docName, ydoc, conn, storage) => {
bindState: async (docName, ydoc, conn, storage, docsCache) => {
let timingReadStateDuration;
let timingDaAdminGetDuration;

Expand Down Expand Up @@ -354,7 +352,7 @@ export const persistence = {
// eslint-disable-next-line no-console
console.log('Could not be restored, trying to restore from da-admin', docName);
setTimeout(() => {
if (ydoc === docs.get(docName)) {
if (ydoc === docsCache.get(docName)) {
const rootType = ydoc.getXmlFragment('prosemirror');
ydoc.transact(() => {
try {
Expand All @@ -377,15 +375,15 @@ export const persistence = {

ydoc.on('update', async () => {
// Whenever we receive an update on the document store it in the local storage
if (ydoc === docs.get(docName)) { // make sure this ydoc is still active
if (ydoc === docsCache.get(docName)) { // make sure this ydoc is still active
storeState(docName, Y.encodeStateAsUpdate(ydoc), storage);
}
});

ydoc.on('update', debounce(async () => {
// If we receive an update on the document, store it in da-admin, but debounce it
// to avoid excessive da-admin calls.
if (ydoc === docs.get(docName)) {
if (ydoc === docsCache.get(docName)) {
current = await persistence.update(ydoc, current);
}
}, 2000, { maxWait: 10000 }));
Expand Down Expand Up @@ -453,25 +451,25 @@ export class WSSharedDoc extends Y.Doc {
* @param {boolean} gc - whether garbage collection is enabled
* @returns The Yjs document object, which may be shared across multiple sockets.
*/
export const getYDoc = async (docname, conn, env, storage, timingData, gc = true) => {
let doc = docs.get(docname);
if (doc === undefined) {
// The doc is not yet in the cache, create a new one.
doc = new WSSharedDoc(docname);
doc.gc = gc;
docs.set(docname, doc);
}

if (!doc.conns.get(conn)) {
doc.conns.set(conn, new Set());
}
export const createYDoc = (docname, conn, env, storage, docsCache, gc = true) => {
const doc = new WSSharedDoc(docname);
doc.gc = gc;

// Store the service binding to da-admin which we receive through the environment in the doc
doc.daadmin = env.daadmin;
if (!doc.promise) {
// The doc is not yet bound to the persistence layer, do so now. The promise will be resolved
// when bound.
doc.promise = persistence.bindState(docname, doc, conn, storage);
doc.promise = persistence.bindState(docname, doc, conn, storage, docsCache);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not doing what happened before. IIRC, the idea was to use the promise to make sure the doc doesn't get bound to persistence called concurrently. @bosschaert, are you sure this is still correct (I would have expected to have this move up to the caller like it used to for the await)...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, the doc.promise should only be set if it wasn't there yet. Then we await on it in the next line of code. If it resolved already the await will be a no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I restored the logic but split the method in 2: createYDoc and setupYDoc. Create is only called once, setup each time a new session starts.
Too bad this is not tested.

doc.onClose = () => {
// eslint-disable-next-line no-console
console.log('Document on close - remove from docs cache', docname);
docsCache.delete(docname);
};

return doc;
};

export const setupYDoc = async (doc, conn, timingData) => {
if (!doc.conns.get(conn)) {
doc.conns.set(conn, new Set());
}

// We wait for the promise, for second and subsequent connections to the same doc, this will
Expand All @@ -480,12 +478,8 @@ export const getYDoc = async (docname, conn, env, storage, timingData, gc = true
if (timingData) {
timings.forEach((v, k) => timingData.set(k, v));
}
return doc;
};

// For testing
export const setYDoc = (docname, ydoc) => docs.set(docname, ydoc);

// This read sync message handles readonly connections
const readSyncMessage = (decoder, encoder, doc, readOnly, transactionOrigin) => {
const messageType = decoding.readVarUint(decoder);
Expand Down Expand Up @@ -539,7 +533,7 @@ export const messageListener = (conn, doc, message) => {
// eslint-disable-next-line no-console, no-nested-ternary
console.error('messageListener - Message', err.message);
// eslint-disable-next-line no-console
console.error('Error in messageListener', err);
console.error(`Error in messageListener ${doc?.name} - messageType: ${messageType}`, err);
showError(doc, err);
}
};
Expand All @@ -552,18 +546,18 @@ export const messageListener = (conn, doc, message) => {
* @param {string} docName - The name of the document
* @returns true if the document was found and invalidated, false otherwise.
*/
export const invalidateFromAdmin = async (docName) => {
// eslint-disable-next-line no-console
console.log('Invalidate from Admin received', docName);
const ydoc = docs.get(docName);
export const invalidateFromAdmin = async (ydoc) => {
if (ydoc) {
// eslint-disable-next-line no-console
console.log('Invalidating document', ydoc.name);

// As we are closing all connections, the ydoc will be removed from the docs map
ydoc.conns.forEach((_, c) => closeConn(ydoc, c));

return true;
} else {
// eslint-disable-next-line no-console
console.log('Document not found', docName);
console.log('No document to invalidate');
}
return false;
};
Expand All @@ -576,41 +570,35 @@ export const invalidateFromAdmin = async (docName) => {
* @param {TransactionalStorage} storage - The worker transactional storage object
* @returns {Promise<void>} - The return value of this
*/
export const setupWSConnection = async (conn, docName, env, storage) => {
const timingData = new Map();

export const setupWSConnection = async (conn, ydoc) => {
// eslint-disable-next-line no-param-reassign
conn.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const doc = await getYDoc(docName, conn, env, storage, timingData, true);

// listen and reply to events
conn.addEventListener('message', (message) => messageListener(conn, doc, new Uint8Array(message.data)));
conn.addEventListener('message', (message) => messageListener(conn, ydoc, new Uint8Array(message.data)));

// Check if connection is still alive
conn.addEventListener('close', () => {
closeConn(doc, conn);
closeConn(ydoc, conn);
});
// put the following in a variables in a block so the interval handlers don't keep in in
// scope
try {
// send sync step 1
let encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeSyncStep1(encoder, doc);
send(doc, conn, encoding.toUint8Array(encoder));
const awarenessStates = doc.awareness.getStates();
syncProtocol.writeSyncStep1(encoder, ydoc);
send(ydoc, conn, encoding.toUint8Array(encoder));
const awarenessStates = ydoc.awareness.getStates();
if (awarenessStates.size > 0) {
encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(encoder, awarenessProtocol
.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys())));
send(doc, conn, encoding.toUint8Array(encoder));
.encodeAwarenessUpdate(ydoc.awareness, Array.from(awarenessStates.keys())));
send(ydoc, conn, encoding.toUint8Array(encoder));
}
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error in setupWSConnection', err);
}

return timingData;
};
26 changes: 10 additions & 16 deletions test/edge.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import assert from 'assert';

import * as Y from 'yjs';
import defaultEdge, { DocRoom, handleApiRequest, handleErrors } from '../src/edge.js';
import { WSSharedDoc, persistence, setYDoc } from '../src/shareddoc.js';
import { WSSharedDoc, persistence } from '../src/shareddoc.js';
import { doc2aem } from '../src/collab.js';

function hash(str) {
Expand Down Expand Up @@ -138,7 +138,6 @@ describe('Worker test suite', () => {
it('Docroom deleteFromAdmin', async () => {
const ydocName = 'http://foobar.com/q.html';
const testYdoc = new WSSharedDoc(ydocName);
const m = setYDoc(ydocName, testYdoc);

const connCalled = []
const mockConn = {
Expand All @@ -150,12 +149,10 @@ describe('Worker test suite', () => {
url: `${ydocName}?api=deleteAdmin`
};

const dr = new DocRoom({});
const dr = new DocRoom({}, {}, new Map([[ydocName, testYdoc]]));

assert(m.has(ydocName), 'Precondition');
const resp = await dr.fetch(req)
assert.equal(204, resp.status);
assert(!m.has(ydocName), 'Doc should have been removed');
assert.deepStrictEqual(['close'], connCalled);
});

Expand All @@ -172,8 +169,7 @@ describe('Worker test suite', () => {
it('Docroom syncFromAdmin', async () => {
const ydocName = 'http://foobar.com/a/b/c.html';
const testYdoc = new WSSharedDoc(ydocName);
const m = setYDoc(ydocName, testYdoc);


const connCalled = []
const mockConn = {
close() { connCalled.push('close'); }
Expand All @@ -184,12 +180,10 @@ describe('Worker test suite', () => {
url: `${ydocName}?api=syncAdmin`
};

const dr = new DocRoom({});
const dr = new DocRoom({}, {}, new Map([[ydocName, testYdoc]]));

assert(m.has(ydocName), 'Precondition');
const resp = await dr.fetch(req)
assert.equal(200, resp.status);
assert(!m.has(ydocName), 'Doc should have been removed');
assert.deepStrictEqual(['close'], connCalled);
});

Expand Down Expand Up @@ -255,14 +249,14 @@ describe('Worker test suite', () => {
assert.equal('au123', wsp1.auth);

const acceptIdx = wspCalled.indexOf('accept');
const alMessIdx = wspCalled.indexOf('addEventListener message');
const alClsIdx = wspCalled.indexOf('addEventListener close');
const alMessCount = wspCalled.filter(call => call === 'addEventListener message').length;
const alClsCount = wspCalled.filter(call => call === 'addEventListener close').length;
const clsIdx = wspCalled.indexOf('close');

assert(acceptIdx >= 0);
assert(alMessIdx > acceptIdx);
assert(alClsIdx > alMessIdx);
assert(clsIdx > alClsIdx);
assert(acceptIdx === 0);
assert(alMessCount === 1);
assert(alClsCount === 1);
assert(clsIdx === wspCalled.length - 1);
} finally {
DocRoom.newWebSocketPair = savedNWSP;
persistence.bindState = savedBS;
Expand Down
Loading