diff --git a/package-lock.json b/package-lock.json index a99a146..fcab7a7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { - "name": "y-mongodb-provider", - "version": "0.1.10", + "name": "y-mongodb-provider-crashsafe", + "version": "0.2.0", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "y-mongodb-provider", - "version": "0.1.10", + "name": "y-mongodb-provider-crashsafe", + "version": "0.2.0", "license": "MIT", "dependencies": { "lib0": "^0.2.94", diff --git a/package.json b/package.json index a2da727..ea259a8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { - "name": "y-mongodb-provider", - "version": "0.2.0", + "name": "y-mongodb-provider-crashsafe", + "version": "0.2.4", "description": "MongoDB database adapter for Yjs", "type": "module", "main": "./dist/y-mongodb.cjs", @@ -23,12 +23,12 @@ }, "repository": { "type": "git", - "url": "git+https://github.com/MaxNoetzold/y-mongodb-provider.git" + "url": "git+https://github.com/lukenc/y-mongodb-provider.git" }, "author": "Max Nötzold ", "license": "MIT", "bugs": { - "url": "https://github.com/MaxNoetzold/y-mongodb-provider/issues" + "url": "https://github.com/lukenc/y-mongodb-provider/issues" }, "dependencies": { "lib0": "^0.2.94", @@ -54,7 +54,7 @@ "dist/*", "src/*" ], - "homepage": "https://github.com/MaxNoetzold/y-mongodb-provider#readme", + "homepage": "https://github.com/lukenc/y-mongodb-provider#readme", "keywords": [ "Yjs", "MongoDB", diff --git a/src/mongo-adapter.js b/src/mongo-adapter.js index 2f02f91..b3de161 100644 --- a/src/mongo-adapter.js +++ b/src/mongo-adapter.js @@ -8,8 +8,7 @@ import { MongoClient } from 'mongodb'; */ function getMongoDbDatabaseName(connectionString) { const url = new URL(connectionString); - const database = url.pathname.slice(1); - return database; + return url.pathname.slice(1); } export class MongoAdapter { @@ -64,6 +63,17 @@ export class MongoAdapter { } } + /** + * Ensure indexes for the collection to optimize queries. + * 只在首次写入时调用即可,无需判断是否已存在。 + * @param {string} collectionName + * @returns {Promise} + */ + async ensureIndexes(collectionName) { + const collection = this.db.collection(collectionName); + await collection.createIndex({ docName: 1, clock: 1 }); + } + /** * * @param {import('mongodb').Filter} query diff --git a/src/utils.js b/src/utils.js index 764e07e..da18f8c 100644 --- a/src/utils.js +++ b/src/utils.js @@ -188,6 +188,11 @@ export const storeUpdate = async (db, docName, update) => { Y.applyUpdate(ydoc, update); const sv = Y.encodeStateVector(ydoc); await writeStateVector(db, docName, sv, 0); + // ensure indexes on first write + if (typeof db.ensureIndexes === 'function') { + const collectionName = db._getCollectionName({ docName }); + await db.ensureIndexes(collectionName); + } } // mongodb has a maximum document size of 16MB; diff --git a/src/y-mongodb.js b/src/y-mongodb.js index 2faf04e..0a56bb3 100644 --- a/src/y-mongodb.js +++ b/src/y-mongodb.js @@ -4,6 +4,8 @@ import * as promise from 'lib0/promise'; import { MongoAdapter } from './mongo-adapter.js'; import * as U from './utils.js'; +const APPLY_FULL_STATUS = 'applyFull'; + export class MongodbPersistence { /** * Create a y-mongodb persistence instance. @@ -47,13 +49,14 @@ export class MongodbPersistence { this.tr = {}; /** - * Execute an transaction on a database. This will ensure that other processes are + * Execute a transaction on a database. This will ensure that other processes are * currently not writing. * * This is a private method and might change in the future. * * @template T * + * @param docName The name of the document * @param {function(MongoAdapter):Promise} f A transaction that receives the db object * @return {Promise} */ @@ -101,13 +104,40 @@ export class MongodbPersistence { return this._transact(docName, async (db) => { const updates = await U.getMongoUpdates(db, docName); const ydoc = new Y.Doc(); + let applyNum = 0; ydoc.transact(() => { for (let i = 0; i < updates.length; i++) { - Y.applyUpdate(ydoc, updates[i]); + try { + Y.applyUpdate(ydoc, updates[i]); + } catch (e) { + console.warn(`Failed to apply update ${i} to document "${docName}".`, e); + break; + } + applyNum += 1; } }); - if (updates.length > this.flushSize) { - await U.flushDocument(db, docName, Y.encodeStateAsUpdate(ydoc), Y.encodeStateVector(ydoc)); + // 检查是否所有更新都已应用 + const allUpdatesApplied = applyNum === updates.length; + + // 设置应用状态 + ydoc.getMap(APPLY_FULL_STATUS).set('status', allUpdatesApplied); + + if (allUpdatesApplied) { + // 判断是否需要执行文档刷新 + if (updates.length > this.flushSize) { + // 更新数量超过阈值且全部应用成功,执行文档刷新 + await U.flushDocument( + db, + docName, + Y.encodeStateAsUpdate(ydoc), + Y.encodeStateVector(ydoc), + ); + } + } else { + // 未能全部应用成功,记录警告 + console.log( + `Failed to apply all updates to document "${docName}". Applied ${applyNum}/${updates.length} updates.`, + ); } return ydoc; });