Skip to content
43 changes: 43 additions & 0 deletions src/admin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/

export default async function adminFetch(docName, method, auth, env, body) {
const { DAADMIN_API } = env;
/* c8 ignore start */
if (!DAADMIN_API) {
throw new Error('DAADMIN_API is not set');
}
/* c8 ignore end */
const headers = new Headers();
headers.set('X-DA-Initiator', 'collab');
if (auth) {
if (Array.isArray(auth)) {
headers.set('Authorization', [...new Set(auth)].join(','));
} else {
headers.set('Authorization', auth);
}
}

// if docname is a full url, we need to extract the pathname
let pathname = docName;
if (docName.startsWith('https://')) {
pathname = new URL(docName).pathname;
}
const url = new URL(pathname, DAADMIN_API);
const opts = { method, headers };
if (body) {
opts.body = body;
}
// eslint-disable-next-line no-console
console.log('da-collab fetches from da-admin', url.toString(), method);
return fetch(url, opts);
}
12 changes: 3 additions & 9 deletions src/edge.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* governing permissions and limitations under the License.
*/
import { invalidateFromAdmin, setupWSConnection } from './shareddoc.js';
import adminFetch from './admin.js';

// This is the Edge Worker, built using Durable Objects!

Expand Down Expand Up @@ -61,11 +62,9 @@ async function adminAPI(api, url, request, env) {

// A simple Ping API to check that the worker responds.
function ping(env) {
const adminsb = env.daadmin !== undefined ? '"da-admin"' : '';

const json = `{
"status": "ok",
"service_bindings": [${adminsb}]
"admin_api": "${env.DAADMIN_API || ''}"
}
`;
return new Response(json, { status: 200 });
Expand Down Expand Up @@ -130,13 +129,8 @@ export async function handleApiRequest(request, env) {
// Check if we have the authorization for the room (this is a poor man's solution as right now
// only da-admin knows).
try {
const opts = { method: 'HEAD' };
if (auth) {
opts.headers = new Headers({ Authorization: auth });
}

const timingBeforeDaAdminHead = Date.now();
const initialReq = await env.daadmin.fetch(docName, opts);
const initialReq = await adminFetch(docName, 'HEAD', auth, env);

// this seems to be required by CloudFlare to consider the request as completed
await initialReq.text();
Expand Down
34 changes: 11 additions & 23 deletions src/shareddoc.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as encoding from 'lib0/encoding.js';
import * as decoding from 'lib0/decoding.js';
import debounce from 'lodash/debounce.js';
import { aem2doc, doc2aem, EMPTY_DOC } from './collab.js';
import adminFetch from './admin.js';

const wsReadyStateConnecting = 0;
const wsReadyStateOpen = 1;
Expand Down Expand Up @@ -195,15 +196,10 @@ export const persistence = {
* returned.
* @param {string} docName - The document name
* @param {string} auth - The authorization header
* @param {object} daadmin - The da-admin worker service binding
* @returns {Promise<string>} - The content of the document
*/
get: async (docName, auth, daadmin) => {
const initalOpts = {};
if (auth) {
initalOpts.headers = new Headers({ Authorization: auth });
}
const initialReq = await daadmin.fetch(docName, initalOpts);
get: async (docName, auth, env) => {
const initialReq = await adminFetch(docName, 'GET', auth, env);
if (initialReq.ok) {
return initialReq.text();
} else if (initialReq.status === 404) {
Expand All @@ -222,13 +218,12 @@ export const persistence = {
* @param {string} content - The content to store
* @returns {object} The response from da-admin.
*/
put: async (ydoc, content) => {
put: async (ydoc, content, env) => {
const blob = new Blob([content], { type: 'text/html' });

const formData = new FormData();
formData.append('data', blob);

const opts = { method: 'PUT', body: formData };
const keys = Array.from(ydoc.conns.keys());
const allReadOnly = keys.length > 0 && keys.every((con) => con.readOnly === true);
if (allReadOnly) {
Expand All @@ -240,21 +235,14 @@ export const persistence = {
.filter((con) => con.readOnly !== true)
.map((con) => con.auth);

if (auth.length > 0) {
opts.headers = new Headers({
Authorization: [...new Set(auth)].join(','),
'X-DA-Initiator': 'collab',
});
}

if (blob.size < 84) {
// eslint-disable-next-line no-console
console.warn('[docroom] Writting back an empty document', ydoc.name, blob.size);
}

const {
ok, status, statusText, body,
} = await ydoc.daadmin.fetch(ydoc.name, opts);
} = await adminFetch(ydoc.name, 'PUT', auth, env, formData);

if (body) {
// tell CloudFlare to consider the request as completed
Expand All @@ -275,13 +263,13 @@ export const persistence = {
* obtained from da-admin
* @returns {string} - the new content of the document in da-admin.
*/
update: async (ydoc, current) => {
update: async (ydoc, current, env) => {
let closeAll = false;
try {
const content = doc2aem(ydoc);
if (current !== content) {
// Only store the document if it was actually changed.
const { ok, status, statusText } = await persistence.put(ydoc, content);
const { ok, status, statusText } = await persistence.put(ydoc, content, env);

if (!ok) {
closeAll = (status === 401 || status === 403);
Expand Down Expand Up @@ -310,7 +298,7 @@ export const persistence = {
* @param {WebSocket} conn - the websocket connection
* @param {TransactionalStorage} storage - the worker transactional storage object
*/
bindState: async (docName, ydoc, conn, storage) => {
bindState: async (docName, ydoc, conn, storage, env) => {
let timingReadStateDuration;
let timingDaAdminGetDuration;

Expand All @@ -319,7 +307,7 @@ export const persistence = {
try {
let newDoc = false;
const timingBeforeDaAdminGet = Date.now();
current = await persistence.get(docName, conn.auth, ydoc.daadmin);
current = await persistence.get(docName, conn.auth, env);
timingDaAdminGetDuration = Date.now() - timingBeforeDaAdminGet;

const timingBeforeReadState = Date.now();
Expand Down Expand Up @@ -401,7 +389,7 @@ export const persistence = {
// If we receive an update on the document, store it in da-admin, but debounce it
// to avoid excessive da-admin calls.
if (current && ydoc === docs.get(docName)) {
current = await persistence.update(ydoc, current);
current = await persistence.update(ydoc, current, env);
}
}, 2000, { maxWait: 10000 }));

Expand Down Expand Up @@ -490,7 +478,7 @@ export const getYDoc = async (docname, conn, env, storage, timingData, gc = true
if (!doc.promise) {
// The doc is not yet bound to the persistence layer, do so now. The promise will be resolved
// when bound.
doc.promise = persistence.bindState(docname, doc, conn, storage);
doc.promise = persistence.bindState(docname, doc, conn, storage, env);
}

// We wait for the promise, for second and subsequent connections to the same doc, this will
Expand Down
Loading