diff --git a/CHANGELOG-Unreleased.md b/CHANGELOG-Unreleased.md index 7c1dc252c..eba13ed75 100644 --- a/CHANGELOG-Unreleased.md +++ b/CHANGELOG-Unreleased.md @@ -2,6 +2,11 @@ ### BREAKING CHANGES +#### 2024-01-17 + +- SyncPushArgs was renamed to SyncPushChangesArgs to free SyncPushArgs which is now used for push sync. +- lastPulletAt in pushChanges is no longer forced to be defined + ### Deprecations ### New features diff --git a/src/RawRecord/index.d.ts b/src/RawRecord/index.d.ts index 54ce69bad..2a5b9e848 100644 --- a/src/RawRecord/index.d.ts +++ b/src/RawRecord/index.d.ts @@ -11,6 +11,8 @@ export type DirtyRaw = { [key: string]: any } type _RawRecord = { id: RecordId _status: SyncStatus + // _changed is used by default pull conflict resolution and determines columns for which local + // changes will override remote changes _changed: string } diff --git a/src/sync/impl/__tests__/markAsSynced.test.js b/src/sync/impl/__tests__/markAsSynced.test.js index db2787641..4c6a87bd6 100644 --- a/src/sync/impl/__tests__/markAsSynced.test.js +++ b/src/sync/impl/__tests__/markAsSynced.test.js @@ -131,7 +131,7 @@ describe('markLocalChangesAsSynced', () => { }) // test that second push will mark all as synced - await markLocalChangesAsSynced(database, localChanges2) + await markLocalChangesAsSynced(database, localChanges2, false) expect(destroyDeletedRecordsSpy).toHaveBeenCalledTimes(2) expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) @@ -146,7 +146,7 @@ describe('markLocalChangesAsSynced', () => { const localChanges = await fetchLocalChanges(database) // mark as synced - await markLocalChangesAsSynced(database, localChanges, { + await markLocalChangesAsSynced(database, localChanges, false, { mock_projects: ['pCreated1', 'pUpdated'], mock_comments: ['cDeleted'], }) @@ -161,6 +161,30 @@ describe('markLocalChangesAsSynced', () => { ) expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) }) + it(`marks only acceptedIds as synced`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, pUpdated } = await makeLocalChanges(database) + const localChanges = await fetchLocalChanges(database) + + // mark as synced + await markLocalChangesAsSynced(database, localChanges, true, {}, { + // probably better solution exists (we essentially list all but expected in verify) + mock_projects: ['pCreated2', 'pDeleted'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tUpdated', 'tDeleted'], + }) + + // verify + const localChanges2 = await fetchLocalChanges(database) + expect(localChanges2.changes).toEqual( + makeChangeSet({ + mock_projects: { created: [pCreated1._raw], updated: [pUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(await allDeletedRecords([comments])).toEqual(['cDeleted']) + }) it(`can mark records as synced when ids are per-table not globally unique`, async () => { const { database, projects, tasks, comments } = makeDatabase() @@ -202,4 +226,45 @@ describe('markLocalChangesAsSynced', () => { it.skip('only returns changed fields', async () => { // TODO: Possible future improvement? }) + describe('pushConflictResolver', () => { + it('marks local changes as synced', async () => { + const { database, tasks } = makeDatabase() + + await makeLocalChanges(database) + + await markLocalChangesAsSynced(database, await fetchLocalChanges(database), false, null, null, + (_table, local, remote, resolved) => { + if (local.id !== 'tCreated' || (remote && remote.changeMe !== true)) { + return resolved + } + resolved.name = remote.name + resolved._status = 'updated' + return resolved + }, + { + mock_tasks: [ + { + id: 'tCreated', + name: 'I shall prevail', + changeMe: true, + }, + ], + }) + + await expectSyncedAndMatches(tasks, 'tCreated', { + _status: 'updated', + name: 'I shall prevail', // concat of remote and local change + }) + + // should be untouched + await expectSyncedAndMatches(tasks, 'tUpdated', { + _status: 'synced', + name: 'local', + position: 100, + description: 'orig', + project_id: 'orig', + }) + + }) + }) }) diff --git a/src/sync/impl/__tests__/synchronize-partialRejections.test.js b/src/sync/impl/__tests__/synchronize-partialRejections.test.js index c45f85a80..74700a78d 100644 --- a/src/sync/impl/__tests__/synchronize-partialRejections.test.js +++ b/src/sync/impl/__tests__/synchronize-partialRejections.test.js @@ -62,4 +62,71 @@ describe('synchronize - partial push rejections', () => { }), ) }) + it(`can partially accept a push`, async () => { + const { database } = makeDatabase() + + const { tCreated, tUpdated } = await makeLocalChanges(database) + + const acceptedIds = Object.freeze({ + // probably better solution exists (we essentially list all but expected in expect below) + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tDeleted'], + }) + const rejectedIds = Object.freeze({ + mock_tasks: ['tCreated', 'tUpdated'], + mock_comments: ['cDeleted'], + }) + const log = {} + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(() => ({ experimentalAcceptedIds: acceptedIds })), + pushShouldConfirmOnlyAccepted: true, + log, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_tasks: { created: [tCreated._raw], updated: [tUpdated._raw] }, + mock_comments: { deleted: ['cDeleted'] }, + }), + ) + expect(log.rejectedIds).toStrictEqual(rejectedIds) + }) + it(`can partially accept a push and make changes during push`, async () => { + const { database, comments } = makeDatabase() + + const { pCreated1, tUpdated } = await makeLocalChanges(database) + const pCreated1Raw = { ...pCreated1._raw } + let newComment + await synchronize({ + database, + pullChanges: jest.fn(emptyPull()), + pushChanges: jest.fn(async () => { + await database.write(async () => { + await pCreated1.update((p) => { + p.name = 'updated!' + }) + newComment = await comments.create((c) => { + c.body = 'bazinga' + }) + }) + return { + experimentalAcceptedIds: { + mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'], + mock_comments: ['cCreated', 'cUpdated'], + mock_tasks: ['tCreated', 'tDeleted'], + }, + } + }), + pushShouldConfirmOnlyAccepted: true, + }) + expect((await fetchLocalChanges(database)).changes).toEqual( + makeChangeSet({ + mock_projects: { created: [{ ...pCreated1Raw, _changed: 'name', name: 'updated!' }] }, + mock_tasks: { updated: [tUpdated._raw] }, + mock_comments: { created: [newComment._raw], deleted: ['cDeleted'] }, + }), + ) + }) }) diff --git a/src/sync/impl/__tests__/synchronize.test.js b/src/sync/impl/__tests__/synchronize.test.js index 475a111a9..04daa5c08 100644 --- a/src/sync/impl/__tests__/synchronize.test.js +++ b/src/sync/impl/__tests__/synchronize.test.js @@ -14,7 +14,7 @@ import { emptyPull, } from './helpers' -import { synchronize, hasUnsyncedChanges } from '../../index' +import { synchronize, optimisticSyncPush, hasUnsyncedChanges } from '../../index' import { fetchLocalChanges, getLastPulledAt } from '../index' const observeDatabase = (database) => { @@ -151,6 +151,20 @@ describe('synchronize', () => { expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) expect(log.localChangeCount).toBe(10) }) + it('can do push-only sync', async () => { + const { database } = makeDatabase() + + await makeLocalChanges(database) + const localChanges = await fetchLocalChanges(database) + + const pushChanges = jest.fn() + const log = {} + await optimisticSyncPush({ database, pushChanges, log }) + + expect(pushChanges).toHaveBeenCalledWith({ changes: localChanges.changes, lastPulledAt: null }) + expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges) + expect(log.localChangeCount).toBe(10) + }) it('can pull changes', async () => { const { database, projects, tasks } = makeDatabase() @@ -571,4 +585,38 @@ describe('synchronize', () => { it.skip(`only emits one collection batch change`, async () => { // TODO: unskip when batch change emissions are implemented }) + it(`allows push conflict resolution to be customized`, async () => { + const { database, tasks } = makeDatabase() + const task = tasks.prepareCreateFromDirtyRaw({ + id: 't1', + name: 'Task name', + position: 1, + is_completed: false, + project_id: 'p1', + }) + await database.write(() => database.batch(task)) + + const pushConflictResolver = jest.fn((_table, local, remote, resolved) => { + return resolved + }) + + await synchronize({ + database, + pullChanges: () => ({ + timestamp: 1500, + }), + pushChanges: async () => { + return { + pushResultSet: { + mock_tasks: [ + {id: 't1'}, + ], + }, + } + }, + pushConflictResolver, + }) + + expect(pushConflictResolver).toHaveBeenCalledTimes(1) + }) }) diff --git a/src/sync/impl/applyRemote.js b/src/sync/impl/applyRemote.js index c8a988172..8e619d53e 100644 --- a/src/sync/impl/applyRemote.js +++ b/src/sync/impl/applyRemote.js @@ -11,7 +11,6 @@ import type { Collection, Model, TableName, - DirtyRaw, Query, RawRecord, } from '../..' @@ -25,7 +24,7 @@ import type { SyncConflictResolver, SyncPullStrategy, } from '../index' -import { prepareCreateFromRaw, prepareUpdateFromRaw, recordFromRaw } from './helpers' +import { prepareCreateFromRaw, prepareUpdateFromRaw, recordFromRaw, validateRemoteRaw } from './helpers' type ApplyRemoteChangesContext = $Exact<{ db: Database, @@ -249,15 +248,6 @@ const getAllRecordsToApply = ( ) } -function validateRemoteRaw(raw: DirtyRaw): void { - // TODO: I think other code is actually resilient enough to handle illegal _status and _changed - // would be best to change that part to a warning - but tests are needed - invariant( - raw && typeof raw === 'object' && 'id' in raw && !('_status' in raw || '_changed' in raw), - `[Sync] Invalid raw record supplied to Sync. Records must be objects, must have an 'id' field, and must NOT have a '_status' or '_changed' fields`, - ) -} - function prepareApplyRemoteChangesToCollection( recordsToApply: RecordsToApplyRemoteChangesTo, collection: Collection, diff --git a/src/sync/impl/helpers.d.ts b/src/sync/impl/helpers.d.ts index 746efbc87..02ed0acd7 100644 --- a/src/sync/impl/helpers.d.ts +++ b/src/sync/impl/helpers.d.ts @@ -1,12 +1,20 @@ import type { Model, Collection, Database } from '../..' import type { RawRecord, DirtyRaw } from '../../RawRecord' -import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' +import type { + SyncLog, + SyncDatabaseChangeSet, + SyncShouldUpdateRecord, + SyncConflictResolver, + SyncPushResultSet, +} from '../index' // Returns raw record with naive solution to a conflict based on local `_changed` field // This is a per-column resolution algorithm. All columns that were changed locally win // and will be applied on top of the remote version. export function resolveConflict(local: RawRecord, remote: DirtyRaw): DirtyRaw +export function validateRemoteRaw(raw: DirtyRaw): void + export function prepareCreateFromRaw( collection: Collection, dirtyRaw: DirtyRaw, @@ -19,7 +27,11 @@ export function prepareUpdateFromRaw( conflictResolver?: SyncConflictResolver, ): T -export function prepareMarkAsSynced(record: T): T +export function prepareMarkAsSynced( + record: T, + pushConflictResolver?: SyncConflictResolver, + remoteDirtyRaw?: DirtyRaw, +): T export function ensureSameDatabase(database: Database, initialResetCount: number): void diff --git a/src/sync/impl/helpers.js b/src/sync/impl/helpers.js index df9d32d19..1e24b479f 100644 --- a/src/sync/impl/helpers.js +++ b/src/sync/impl/helpers.js @@ -4,9 +4,14 @@ import { values } from '../../utils/fp' import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { invariant } from '../../utils/common' -import type { Model, Collection, Database } from '../..' +import type { Model, Collection, Database, TableName, RecordId } from '../..' import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord' -import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index' +import type { + SyncIds, + SyncLog, + SyncDatabaseChangeSet, + SyncConflictResolver, +} from '../index' // Returns raw record with naive solution to a conflict based on local `_changed` field // This is a per-column resolution algorithm. All columns that were changed locally win @@ -37,6 +42,15 @@ export function resolveConflict(local: RawRecord, remote: DirtyRaw): DirtyRaw { return resolved } +export function validateRemoteRaw(raw: DirtyRaw): void { + // TODO: I think other code is actually resilient enough to handle illegal _status and _changed + // would be best to change that part to a warning - but tests are needed + invariant( + raw && typeof raw === 'object' && 'id' in raw && !('_status' in raw || '_changed' in raw), + `[Sync] Invalid raw record supplied to Sync. Records must be objects, must have an 'id' field, and must NOT have a '_status' or '_changed' fields`, + ) +} + function replaceRaw(record: Model, dirtyRaw: DirtyRaw): void { record._raw = sanitizedRaw(dirtyRaw, record.collection.schema) } @@ -120,9 +134,16 @@ export function prepareUpdateFromRaw( }) } -export function prepareMarkAsSynced(record: T): T { +export function prepareMarkAsSynced( + record: T, + pushConflictResolver?: ?SyncConflictResolver, + remoteDirtyRaw?: ?DirtyRaw, +): T { // $FlowFixMe - const newRaw = Object.assign({}, record._raw, { _status: 'synced', _changed: '' }) // faster than object spread + let newRaw = Object.assign({}, record._raw, { _status: 'synced', _changed: '' }) // faster than object spread + if (pushConflictResolver) { + newRaw = pushConflictResolver(record.collection.table, record._raw, remoteDirtyRaw, newRaw) + } // $FlowFixMe return record.prepareUpdate(() => { replaceRaw(record, newRaw) @@ -148,3 +169,46 @@ export const changeSetCount: (SyncDatabaseChangeSet) => number = (changeset) => ({ created, updated, deleted }) => created.length + updated.length + deleted.length, ), ) + +const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName]: RecordId[] } = (changeset) => + Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { + // $FlowFixMe + const { created, updated, deleted } = changeset[key] + // $FlowFixMe + acc[key] = [ + ...created.map(it => it.id), + ...updated.map(it => it.id), + ...deleted, + ] + return acc + }, {}) + +// Returns all rejected ids and is used when accepted ids are used +export const findRejectedIds: + (?SyncIds, ?SyncIds, SyncDatabaseChangeSet) => SyncIds = + (experimentalRejectedIds, experimentalAcceptedIds, changeset) => { + const localIds = extractChangeSetIds(changeset) + + const acceptedIdsSets = Object.keys(changeset).reduce( + (acc: { [TableName]: Set }, key: string) => { + // $FlowFixMe + acc[key] = new Set(experimentalAcceptedIds[key]) + return acc + }, {}) + + return Object.keys(changeset).reduce((acc: { [TableName]: RecordId[] }, key: string) => { + const rejectedIds = [ + // $FlowFixMe + ...(experimentalRejectedIds ? experimentalRejectedIds[key] || [] : []), + // $FlowFixMe + ...(localIds[key] || []), + // $FlowFixMe + ].filter(it => !acceptedIdsSets[key].has(it)) + + if (rejectedIds.length > 0) { + // $FlowFixMe + acc[key] = rejectedIds + } + return acc + }, {}) + } \ No newline at end of file diff --git a/src/sync/impl/markAsSynced.d.ts b/src/sync/impl/markAsSynced.d.ts index 242ec0356..5d5797e01 100644 --- a/src/sync/impl/markAsSynced.d.ts +++ b/src/sync/impl/markAsSynced.d.ts @@ -1,9 +1,13 @@ import type { Database, Model, TableName } from '../..' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import type { SyncLocalChanges, SyncIds, SyncConflictResolver, SyncPushResultSet } from '../index' export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + rejectedIds?: SyncIds, + allAcceptedIds?: SyncIds, + pushConflictResolver?: SyncConflictResolver, + remoteDirtyRaws?: SyncPushResultSet, ): Promise diff --git a/src/sync/impl/markAsSynced.js b/src/sync/impl/markAsSynced.js index 8a3b6c13e..80d7e3144 100644 --- a/src/sync/impl/markAsSynced.js +++ b/src/sync/impl/markAsSynced.js @@ -4,12 +4,14 @@ import areRecordsEqual from '../../utils/fp/areRecordsEqual' import { logError } from '../../utils/common' import type { Database, Model, TableName } from '../..' -import { prepareMarkAsSynced } from './helpers' -import type { SyncLocalChanges, SyncRejectedIds } from '../index' +import { prepareMarkAsSynced, validateRemoteRaw } from './helpers' +import type { SyncLocalChanges, SyncIds, SyncConflictResolver, SyncPushResultSet } from '../index' const recordsToMarkAsSynced = ( { changes, affectedRecords }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds: SyncIds, ): Model[] => { const syncedRecords = [] @@ -17,6 +19,7 @@ const recordsToMarkAsSynced = ( const { created, updated } = changes[(table: any)] const raws = created.concat(updated) const rejectedIds = new Set(allRejectedIds[(table: any)]) + const acceptedIds = new Set(allAcceptedIds[(table: any)] || []) raws.forEach((raw) => { const { id } = raw @@ -27,7 +30,8 @@ const recordsToMarkAsSynced = ( ) return } - if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id)) { + const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id) + if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) { syncedRecords.push(record) } }) @@ -38,27 +42,49 @@ const recordsToMarkAsSynced = ( const destroyDeletedRecords = ( db: Database, { changes }: SyncLocalChanges, - allRejectedIds: SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + allRejectedIds: SyncIds, + allAcceptedIds: SyncIds, ): Promise[] => Object.keys(changes).map((_tableName) => { const tableName: TableName = (_tableName: any) const rejectedIds = new Set(allRejectedIds[tableName]) - const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id)) + const acceptedIds = new Set(allAcceptedIds[tableName] || []) + const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id) && + (!allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id))) return deleted.length ? db.adapter.destroyDeletedRecords(tableName, deleted) : Promise.resolve() }) export default function markLocalChangesAsSynced( db: Database, syncedLocalChanges: SyncLocalChanges, - rejectedIds?: ?SyncRejectedIds, + allowOnlyAcceptedIds: boolean, + rejectedIds?: ?SyncIds, + allAcceptedIds?: ?SyncIds, + pushConflictResolver?: ?SyncConflictResolver, + remoteDirtyRaws?: ?SyncPushResultSet, ): Promise { return db.write(async () => { // update and destroy records concurrently await Promise.all([ db.batch( - recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}).map(prepareMarkAsSynced), + recordsToMarkAsSynced(syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}).map(it => { + // if pushConflictResolver is not set, lookup by remote raws isn't necessary + if (!pushConflictResolver || !remoteDirtyRaws) { + return prepareMarkAsSynced(it, null, null) + } + const collectionRemoteDirtyRaws = remoteDirtyRaws[it.collection.modelClass.table] + if (!collectionRemoteDirtyRaws) { + return prepareMarkAsSynced(it, null, null) + } + const remoteDirtyRaw = collectionRemoteDirtyRaws.find(dirtyRaw => dirtyRaw.id === it.id) + remoteDirtyRaw && validateRemoteRaw(remoteDirtyRaw) + return prepareMarkAsSynced(it, pushConflictResolver, remoteDirtyRaw) + }), ), - ...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}), + ...destroyDeletedRecords(db, syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {}, + allAcceptedIds || {}), ]) }, 'sync-markLocalChangesAsSynced') } diff --git a/src/sync/impl/synchronize.d.ts b/src/sync/impl/synchronize.d.ts index f19c96957..0dfbc8e05 100644 --- a/src/sync/impl/synchronize.d.ts +++ b/src/sync/impl/synchronize.d.ts @@ -1,4 +1,4 @@ -import type { SyncArgs } from '../index' +import type { SyncArgs, OptimisticSyncPushArgs } from '../index' export default function synchronize({ database, @@ -9,6 +9,16 @@ export default function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise + +export function optimisticSyncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, +}: OptimisticSyncPushArgs): Promise diff --git a/src/sync/impl/synchronize.js b/src/sync/impl/synchronize.js index eef1f6e81..3bf6e0de1 100644 --- a/src/sync/impl/synchronize.js +++ b/src/sync/impl/synchronize.js @@ -11,8 +11,53 @@ import { setLastPulledSchemaVersion, getMigrationInfo, } from './index' -import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers' -import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index' +import { ensureSameDatabase, isChangeSetEmpty, changeSetCount, findRejectedIds } from './helpers' +import type { + SyncArgs, + OptimisticSyncPushArgs, + SyncPushArgs, + Timestamp, + SyncPullStrategy, +} from '../index' + +async function syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt, +}: SyncPushArgs): Promise { + if (pushChanges) { + log && (log.phase = 'ready to fetch local changes') + + const localChanges = await fetchLocalChanges(database) + log && (log.localChangeCount = changeSetCount(localChanges.changes)) + log && (log.phase = 'fetched local changes') + + ensureSameDatabase(database, resetCount) + if (!isChangeSetEmpty(localChanges.changes)) { + log && (log.phase = 'ready to push') + const pushResult = + (await pushChanges({ changes: localChanges.changes, lastPulledAt })) || {} + log && (log.phase = 'pushed') + log && (log.rejectedIds = pushResult.experimentalRejectedIds) + if (log && pushShouldConfirmOnlyAccepted) { + log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds, + pushResult.experimentalAcceptedIds, localChanges.changes) + } + + ensureSameDatabase(database, resetCount) + await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false, + pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds, pushConflictResolver, + pushResult.pushResultSet) + log && (log.phase = 'marked local changes as synced') + } + } else { + log && (log.phase = 'pushChanges not defined') + } +} export default async function synchronize({ database, @@ -24,6 +69,8 @@ export default async function synchronize({ migrationsEnabledAtVersion, log, conflictResolver, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, _unsafeBatchPerCollection, unsafeTurbo, }: SyncArgs): Promise { @@ -120,29 +167,39 @@ export default async function synchronize({ }, 'sync-synchronize-apply') // push phase - if (pushChanges) { - log && (log.phase = 'ready to fetch local changes') - - const localChanges = await fetchLocalChanges(database) - log && (log.localChangeCount = changeSetCount(localChanges.changes)) - log && (log.phase = 'fetched local changes') - - ensureSameDatabase(database, resetCount) - if (!isChangeSetEmpty(localChanges.changes)) { - log && (log.phase = 'ready to push') - const pushResult = - (await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {} - log && (log.phase = 'pushed') - log && (log.rejectedIds = pushResult.experimentalRejectedIds) - - ensureSameDatabase(database, resetCount) - await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds) - log && (log.phase = 'marked local changes as synced') - } - } else { - log && (log.phase = 'pushChanges not defined') - } + await syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt: newLastPulledAt, + }) log && (log.finishedAt = new Date()) log && (log.phase = 'done') } + +export async function optimisticSyncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, +}: OptimisticSyncPushArgs): Promise { + const resetCount = database._resetCount + + const lastPulledAt = await getLastPulledAt(database) + log && (log.lastPulledAt = lastPulledAt) + + await syncPush({ + database, + pushChanges, + log, + pushShouldConfirmOnlyAccepted, + pushConflictResolver, + resetCount, + lastPulledAt, + }) +} \ No newline at end of file diff --git a/src/sync/index.d.ts b/src/sync/index.d.ts index b4449d31f..b053cc5d4 100644 --- a/src/sync/index.d.ts +++ b/src/sync/index.d.ts @@ -27,11 +27,19 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [tableName: TableName]: RecordId[] } +export type SyncIds = { [tableName: TableName]: RecordId[] } -export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> +export type SyncRejectedIds = SyncIds -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushChangesArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }> + +export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } + +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, + pushResultSet?: SyncPushResultSet, +}> type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }> export type SyncLog = { @@ -41,7 +49,7 @@ export type SyncLog = { migration?: MigrationSyncChanges; newLastPulledAt?: number; resolvedConflicts?: SyncConflict[]; - rejectedIds?: SyncRejectedIds; + rejectedIds?: SyncIds; finishedAt?: Date; remoteChangeCount?: number; localChangeCount?: number; @@ -56,10 +64,31 @@ export type SyncConflictResolver = ( resolved: DirtyRaw, ) => DirtyRaw -export type SyncArgs = $Exact<{ +export type OptimisticSyncPushArgs = $Exact<{ + database: Database; + pushChanges?: (_: SyncPushChangesArgs) => Promise; + log?: SyncLog; + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; + // conflict resolver on push side of sync which also requires returned records from backend. + // This is also useful for multi-step sync where one must control in which state sync is and if it + // must be repeated. + // Note that by default _status will be still synced so update if required + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + pushConflictResolver?: SyncConflictResolver; +}> + +export type SyncPushArgs = $Exact<{OptimisticSyncPushArgs}> & $Exact<{ + resetCount: number; + lastPulledAt: Timestamp; +}> + +export type SyncArgs = $Exact<{OptimisticSyncPushArgs}> & $Exact<{ database: Database; pullChanges: (_: SyncPullArgs) => Promise; - pushChanges?: (_: SyncPushArgs) => Promise; // version at which support for migration syncs was added - the version BEFORE first syncable migration migrationsEnabledAtVersion?: SchemaVersion; sendCreatedAsUpdated?: boolean; @@ -90,4 +119,6 @@ export type SyncArgs = $Exact<{ export function synchronize(args: SyncArgs): Promise +export function optimisticSyncPush(args: OptimisticSyncPushArgs): Promise + export function hasUnsyncedChanges({ database }: $Exact<{ database: Database }>): Promise diff --git a/src/sync/index.js b/src/sync/index.js index fcce9b80d..379a93944 100644 --- a/src/sync/index.js +++ b/src/sync/index.js @@ -53,11 +53,19 @@ export type SyncPullResult = | $Exact<{ syncJson: string }> | $Exact<{ syncJsonId: number }> -export type SyncRejectedIds = { [TableName]: RecordId[] } +export type SyncIds = { [TableName]: RecordId[] } -export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }> +export type SyncRejectedIds = SyncIds -export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }> +export type SyncPushChangesArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt?: ?Timestamp }> + +export type SyncPushResultSet = { [tableName: TableName]: DirtyRaw[] } + +export type SyncPushResult = $Exact<{ + experimentalRejectedIds?: SyncIds, + experimentalAcceptedIds?: SyncIds, + pushResultSet?: SyncPushResultSet, +}> type SyncConflict = $Exact<{ local: DirtyRaw, remote: DirtyRaw, resolved: DirtyRaw }> export type SyncLog = { @@ -67,7 +75,7 @@ export type SyncLog = { migration?: ?MigrationSyncChanges, newLastPulledAt?: number, resolvedConflicts?: SyncConflict[], - rejectedIds?: SyncRejectedIds, + rejectedIds?: SyncIds, finishedAt?: Date, remoteChangeCount?: number, localChangeCount?: number, @@ -75,6 +83,12 @@ export type SyncLog = { error?: Error, } +export type SyncShouldUpdateRecord = ( + table: TableName, + local: DirtyRaw, + remote: DirtyRaw, +) => boolean + export type SyncConflictResolver = ( table: TableName, local: DirtyRaw, @@ -82,37 +96,63 @@ export type SyncConflictResolver = ( resolved: DirtyRaw, ) => DirtyRaw +export type OptimisticSyncPushArgs = $Exact<{ + database: Database; + pushChanges?: (_: SyncPushChangesArgs) => Promise; + log?: SyncLog; + // experimental customization that will cause to only set records as synced if we return id. + // This will in turn cause all records to be re-pushed if id wasn't returned. This allows to + // "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that + // unpredicted error will cause data loss (when failed data push isn't re-pushed) + pushShouldConfirmOnlyAccepted?: boolean; + // conflict resolver on push side of sync which also requires returned records from backend. + // This is also useful for multi-step sync where one must control in which state sync is and if it + // must be repeated. + // Note that by default _status will be still synced so update if required + // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. + pushConflictResolver?: SyncConflictResolver; +}> + +export type SyncPushArgs = $Exact<{ + ...OptimisticSyncPushArgs; + resetCount: number; + lastPulledAt?: ?Timestamp; +}> + // TODO: JSDoc'ify this export type SyncArgs = $Exact<{ - database: Database, - pullChanges: (SyncPullArgs) => Promise, - pushChanges?: (SyncPushArgs) => Promise, + ...OptimisticSyncPushArgs; + database: Database; + pullChanges: (_: SyncPullArgs) => Promise; // version at which support for migration syncs was added - the version BEFORE first syncable migration - migrationsEnabledAtVersion?: SchemaVersion, - sendCreatedAsUpdated?: boolean, - log?: SyncLog, + migrationsEnabledAtVersion?: SchemaVersion; + sendCreatedAsUpdated?: boolean; + log?: SyncLog; + // Advanced (unsafe) customization point. Useful when doing per record conflict resolution and can + // determine directly from remote and local if we can keep local. + shouldUpdateRecord?: SyncShouldUpdateRecord; // Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple // columns and want to have them updated consistently, or to implement partial sync // It's called for every record being updated locally, so be sure that this function is FAST. // If you don't want to change default behavior for a given record, return `resolved` as is // Note that it's safe to mutate `resolved` object, so you can skip copying it for performance. - conflictResolver?: SyncConflictResolver, + conflictResolver?: SyncConflictResolver; // commits changes in multiple batches, and not one - temporary workaround for memory issue - _unsafeBatchPerCollection?: boolean, + _unsafeBatchPerCollection?: boolean; // Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code. // This can only be used on initial (login) sync, not for incremental syncs. // This can only be used with SQLiteAdapter with JSI enabled. // The exact API may change between versions of WatermelonDB. // See documentation for more details. - unsafeTurbo?: boolean, - // Called after changes are pulled with whatever was returned by pullChanges, minus `changes`. Useful + unsafeTurbo?: boolean; + // Called after pullChanges with whatever was returned by pullChanges, minus `changes`. Useful // when using turbo mode - onDidPullChanges?: (Object) => Promise, + onDidPullChanges?: (_: Object) => Promise; // Called after pullChanges is done, but before these changes are applied. Some stats about the pulled // changes are passed as arguments. An advanced user can use this for example to show some UI to the user // when processing a very large sync (could be useful for replacement syncs). Note that remote change count // is NaN in turbo mode. - onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise, + onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise; }> /** @@ -130,6 +170,21 @@ export async function synchronize(args: SyncArgs): Promise { } } +/** + * Does database push-only synchronize with a remote server + * + * See docs for more details + */ +export async function optimisticSyncPush(args: OptimisticSyncPushArgs): Promise { + try { + const optimisticSyncPushImpl = require('./impl/synchronize').optimisticSyncPush + await optimisticSyncPushImpl(args) + } catch (error) { + args.log && (args.log.error = error) + throw error + } +} + /** * Returns `true` if database has any unsynced changes. *