Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supported experimentalAcceptedIds as safer alternative to rejectedIds #1552

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/sync/impl/__tests__/markAsSynced.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe('markLocalChangesAsSynced', () => {
})

// test that second push will mark all as synced
await markLocalChangesAsSynced(database, localChanges2)
await markLocalChangesAsSynced(database, localChanges2, false)
expect(destroyDeletedRecordsSpy).toHaveBeenCalledTimes(2)
expect(await fetchLocalChanges(database)).toEqual(emptyLocalChanges)

Expand All @@ -146,7 +146,7 @@ describe('markLocalChangesAsSynced', () => {
const localChanges = await fetchLocalChanges(database)

// mark as synced
await markLocalChangesAsSynced(database, localChanges, {
await markLocalChangesAsSynced(database, localChanges, false, {
mock_projects: ['pCreated1', 'pUpdated'],
mock_comments: ['cDeleted'],
})
Expand All @@ -161,6 +161,30 @@ describe('markLocalChangesAsSynced', () => {
)
expect(await allDeletedRecords([comments])).toEqual(['cDeleted'])
})
it(`marks only acceptedIds as synced`, async () => {
const { database, comments } = makeDatabase()

const { pCreated1, pUpdated } = await makeLocalChanges(database)
const localChanges = await fetchLocalChanges(database)

// mark as synced
await markLocalChangesAsSynced(database, localChanges, true, {}, {
// probably better solution exists (we essentially list all but expected in verify)
mock_projects: ['pCreated2', 'pDeleted'],
mock_comments: ['cCreated', 'cUpdated'],
mock_tasks: ['tCreated', 'tUpdated', 'tDeleted'],
})

// verify
const localChanges2 = await fetchLocalChanges(database)
expect(localChanges2.changes).toEqual(
makeChangeSet({
mock_projects: { created: [pCreated1._raw], updated: [pUpdated._raw] },
mock_comments: { deleted: ['cDeleted'] },
}),
)
expect(await allDeletedRecords([comments])).toEqual(['cDeleted'])
})
it(`can mark records as synced when ids are per-table not globally unique`, async () => {
const { database, projects, tasks, comments } = makeDatabase()

Expand Down
67 changes: 67 additions & 0 deletions src/sync/impl/__tests__/synchronize-partialRejections.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,71 @@ describe('synchronize - partial push rejections', () => {
}),
)
})
it(`can partially accept a push`, async () => {
const { database } = makeDatabase()

const { tCreated, tUpdated } = await makeLocalChanges(database)

const acceptedIds = Object.freeze({
// probably better solution exists (we essentially list all but expected in expect below)
mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'],
mock_comments: ['cCreated', 'cUpdated'],
mock_tasks: ['tDeleted'],
})
const rejectedIds = Object.freeze({
mock_tasks: ['tCreated', 'tUpdated'],
mock_comments: ['cDeleted'],
})
const log = {}
await synchronize({
database,
pullChanges: jest.fn(emptyPull()),
pushChanges: jest.fn(() => ({ experimentalAcceptedIds: acceptedIds })),
pushShouldConfirmOnlyAccepted: true,
log,
})
expect((await fetchLocalChanges(database)).changes).toEqual(
makeChangeSet({
mock_tasks: { created: [tCreated._raw], updated: [tUpdated._raw] },
mock_comments: { deleted: ['cDeleted'] },
}),
)
expect(log.rejectedIds).toStrictEqual(rejectedIds)
})
it(`can partially accept a push and make changes during push`, async () => {
const { database, comments } = makeDatabase()

const { pCreated1, tUpdated } = await makeLocalChanges(database)
const pCreated1Raw = { ...pCreated1._raw }
let newComment
await synchronize({
database,
pullChanges: jest.fn(emptyPull()),
pushChanges: jest.fn(async () => {
await database.write(async () => {
await pCreated1.update((p) => {
p.name = 'updated!'
})
newComment = await comments.create((c) => {
c.body = 'bazinga'
})
})
return {
experimentalAcceptedIds: {
mock_projects: ['pCreated1', 'pCreated2', 'pDeleted', 'pUpdated'],
mock_comments: ['cCreated', 'cUpdated'],
mock_tasks: ['tCreated', 'tDeleted'],
},
}
}),
pushShouldConfirmOnlyAccepted: true,
})
expect((await fetchLocalChanges(database)).changes).toEqual(
makeChangeSet({
mock_projects: { created: [{ ...pCreated1Raw, _changed: 'name', name: 'updated!' }] },
mock_tasks: { updated: [tUpdated._raw] },
mock_comments: { created: [newComment._raw], deleted: ['cDeleted'] },
}),
)
})
})
52 changes: 50 additions & 2 deletions src/sync/impl/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import { values } from '../../utils/fp'
import areRecordsEqual from '../../utils/fp/areRecordsEqual'
import { invariant } from '../../utils/common'

import type { Model, Collection, Database } from '../..'
import type { Model, Collection, Database, TableName, RecordId } from '../..'
import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord'
import type { SyncLog, SyncDatabaseChangeSet, SyncConflictResolver } from '../index'
import type {
SyncIds,
SyncLog,
SyncDatabaseChangeSet,
SyncConflictResolver,
} from '../index'

// Returns raw record with naive solution to a conflict based on local `_changed` field
// This is a per-column resolution algorithm. All columns that were changed locally win
Expand Down Expand Up @@ -148,3 +153,46 @@ export const changeSetCount: (SyncDatabaseChangeSet) => number = (changeset) =>
({ created, updated, deleted }) => created.length + updated.length + deleted.length,
),
)

const extractChangeSetIds: (SyncDatabaseChangeSet) => { [TableName<any>]: RecordId[] } = (changeset) =>
Object.keys(changeset).reduce((acc: { [TableName<any>]: RecordId[] }, key: string) => {
// $FlowFixMe
const { created, updated, deleted } = changeset[key]
// $FlowFixMe
acc[key] = [
...created.map(it => it.id),
...updated.map(it => it.id),
...deleted,
]
return acc
}, {})

// Returns all rejected ids and is used when accepted ids are used
export const findRejectedIds:
(?SyncIds, ?SyncIds, SyncDatabaseChangeSet) => SyncIds =
(experimentalRejectedIds, experimentalAcceptedIds, changeset) => {
const localIds = extractChangeSetIds(changeset)

const acceptedIdsSets = Object.keys(changeset).reduce(
(acc: { [TableName<any>]: Set<RecordId> }, key: string) => {
// $FlowFixMe
acc[key] = new Set(experimentalAcceptedIds[key])
return acc
}, {})

return Object.keys(changeset).reduce((acc: { [TableName<any>]: RecordId[] }, key: string) => {
const rejectedIds = [
// $FlowFixMe
...(experimentalRejectedIds ? experimentalRejectedIds[key] || [] : []),
// $FlowFixMe
...(localIds[key] || []),
// $FlowFixMe
].filter(it => !acceptedIdsSets[key].has(it))

if (rejectedIds.length > 0) {
// $FlowFixMe
acc[key] = rejectedIds
}
return acc
}, {})
}
6 changes: 4 additions & 2 deletions src/sync/impl/markAsSynced.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Database, Model, TableName } from '../..'

import type { SyncLocalChanges, SyncRejectedIds } from '../index'
import type { SyncLocalChanges, SyncIds } from '../index'

export default function markLocalChangesAsSynced(
db: Database,
syncedLocalChanges: SyncLocalChanges,
rejectedIds?: SyncRejectedIds,
allowOnlyAcceptedIds: boolean,
rejectedIds?: SyncIds,
allAcceptedIds?: SyncIds,
): Promise<void>
28 changes: 20 additions & 8 deletions src/sync/impl/markAsSynced.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import { logError } from '../../utils/common'
import type { Database, Model, TableName } from '../..'

import { prepareMarkAsSynced } from './helpers'
import type { SyncLocalChanges, SyncRejectedIds } from '../index'
import type { SyncLocalChanges, SyncIds } from '../index'

const recordsToMarkAsSynced = (
{ changes, affectedRecords }: SyncLocalChanges,
allRejectedIds: SyncRejectedIds,
allowOnlyAcceptedIds: boolean,
allRejectedIds: SyncIds,
allAcceptedIds: SyncIds,
): Model[] => {
const syncedRecords = []

Object.keys(changes).forEach((table) => {
const { created, updated } = changes[(table: any)]
const raws = created.concat(updated)
const rejectedIds = new Set(allRejectedIds[(table: any)])
const acceptedIds = new Set(allAcceptedIds[(table: any)] || [])

raws.forEach((raw) => {
const { id } = raw
Expand All @@ -27,7 +30,8 @@ const recordsToMarkAsSynced = (
)
return
}
if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id)) {
const isAccepted = !allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id)
if (areRecordsEqual(record._raw, raw) && !rejectedIds.has(id) && isAccepted) {
syncedRecords.push(record)
}
})
Expand All @@ -38,27 +42,35 @@ const recordsToMarkAsSynced = (
const destroyDeletedRecords = (
db: Database,
{ changes }: SyncLocalChanges,
allRejectedIds: SyncRejectedIds,
allowOnlyAcceptedIds: boolean,
allRejectedIds: SyncIds,
allAcceptedIds: SyncIds,
): Promise<any>[] =>
Object.keys(changes).map((_tableName) => {
const tableName: TableName<any> = (_tableName: any)
const rejectedIds = new Set(allRejectedIds[tableName])
const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id))
const acceptedIds = new Set(allAcceptedIds[tableName] || [])
const deleted = changes[tableName].deleted.filter((id) => !rejectedIds.has(id) &&
(!allAcceptedIds || !allowOnlyAcceptedIds || acceptedIds.has(id)))
return deleted.length ? db.adapter.destroyDeletedRecords(tableName, deleted) : Promise.resolve()
})

export default function markLocalChangesAsSynced(
db: Database,
syncedLocalChanges: SyncLocalChanges,
rejectedIds?: ?SyncRejectedIds,
allowOnlyAcceptedIds: boolean,
rejectedIds?: ?SyncIds,
allAcceptedIds?: ?SyncIds,
): Promise<void> {
return db.write(async () => {
// update and destroy records concurrently
await Promise.all([
db.batch(
recordsToMarkAsSynced(syncedLocalChanges, rejectedIds || {}).map(prepareMarkAsSynced),
recordsToMarkAsSynced(syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {},
allAcceptedIds || {}).map(prepareMarkAsSynced),
),
...destroyDeletedRecords(db, syncedLocalChanges, rejectedIds || {}),
...destroyDeletedRecords(db, syncedLocalChanges, allowOnlyAcceptedIds, rejectedIds || {},
allAcceptedIds || {}),
])
}, 'sync-markLocalChangesAsSynced')
}
1 change: 1 addition & 0 deletions src/sync/impl/synchronize.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export default function synchronize({
migrationsEnabledAtVersion,
log,
conflictResolver,
pushShouldConfirmOnlyAccepted,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void>
10 changes: 8 additions & 2 deletions src/sync/impl/synchronize.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
setLastPulledSchemaVersion,
getMigrationInfo,
} from './index'
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers'
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount, findRejectedIds } from './helpers'
import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index'

export default async function synchronize({
Expand All @@ -24,6 +24,7 @@ export default async function synchronize({
migrationsEnabledAtVersion,
log,
conflictResolver,
pushShouldConfirmOnlyAccepted,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void> {
Expand Down Expand Up @@ -134,9 +135,14 @@ export default async function synchronize({
(await pushChanges({ changes: localChanges.changes, lastPulledAt: newLastPulledAt })) || {}
log && (log.phase = 'pushed')
log && (log.rejectedIds = pushResult.experimentalRejectedIds)
if (log && pushShouldConfirmOnlyAccepted) {
log.rejectedIds = findRejectedIds(pushResult.experimentalRejectedIds,
pushResult.experimentalAcceptedIds, localChanges.changes)
}

ensureSameDatabase(database, resetCount)
await markLocalChangesAsSynced(database, localChanges, pushResult.experimentalRejectedIds)
await markLocalChangesAsSynced(database, localChanges, pushShouldConfirmOnlyAccepted || false,
pushResult.experimentalRejectedIds, pushResult.experimentalAcceptedIds)
log && (log.phase = 'marked local changes as synced')
}
} else {
Expand Down
16 changes: 13 additions & 3 deletions src/sync/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ export type SyncPullResult =
| $Exact<{ syncJson: string }>
| $Exact<{ syncJsonId: number }>

export type SyncRejectedIds = { [tableName: TableName<any>]: RecordId[] }
export type SyncIds = { [tableName: TableName<any>]: RecordId[] }

export type SyncRejectedIds = SyncIds

export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet; lastPulledAt: Timestamp }>

export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }>
export type SyncPushResult = $Exact<{
experimentalRejectedIds?: SyncIds,
experimentalAcceptedIds?: SyncIds,
}>

type SyncConflict = $Exact<{ local: DirtyRaw; remote: DirtyRaw; resolved: DirtyRaw }>
export type SyncLog = {
Expand All @@ -41,7 +46,7 @@ export type SyncLog = {
migration?: MigrationSyncChanges;
newLastPulledAt?: number;
resolvedConflicts?: SyncConflict[];
rejectedIds?: SyncRejectedIds;
rejectedIds?: SyncIds;
finishedAt?: Date;
remoteChangeCount?: number;
localChangeCount?: number;
Expand Down Expand Up @@ -70,6 +75,11 @@ export type SyncArgs = $Exact<{
// If you don't want to change default behavior for a given record, return `resolved` as is
// Note that it's safe to mutate `resolved` object, so you can skip copying it for performance.
conflictResolver?: SyncConflictResolver;
// experimental customization that will cause to only set records as synced if we return id.
// This will in turn cause all records to be re-pushed if id wasn't returned. This allows to
// "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that
// unpredicted error will cause data loss (when failed data push isn't re-pushed)
pushShouldConfirmOnlyAccepted?: boolean;
// commits changes in multiple batches, and not one - temporary workaround for memory issue
_unsafeBatchPerCollection?: boolean;
// Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.
Expand Down
16 changes: 13 additions & 3 deletions src/sync/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ export type SyncPullResult =
| $Exact<{ syncJson: string }>
| $Exact<{ syncJsonId: number }>

export type SyncRejectedIds = { [TableName<any>]: RecordId[] }
export type SyncIds = { [TableName<any>]: RecordId[] }

export type SyncRejectedIds = SyncIds

export type SyncPushArgs = $Exact<{ changes: SyncDatabaseChangeSet, lastPulledAt: Timestamp }>

export type SyncPushResult = $Exact<{ experimentalRejectedIds?: SyncRejectedIds }>
export type SyncPushResult = $Exact<{
experimentalRejectedIds?: SyncIds,
experimentalAcceptedIds?: SyncIds,
}>

type SyncConflict = $Exact<{ local: DirtyRaw, remote: DirtyRaw, resolved: DirtyRaw }>
export type SyncLog = {
Expand All @@ -67,7 +72,7 @@ export type SyncLog = {
migration?: ?MigrationSyncChanges,
newLastPulledAt?: number,
resolvedConflicts?: SyncConflict[],
rejectedIds?: SyncRejectedIds,
rejectedIds?: SyncIds,
finishedAt?: Date,
remoteChangeCount?: number,
localChangeCount?: number,
Expand Down Expand Up @@ -97,6 +102,11 @@ export type SyncArgs = $Exact<{
// If you don't want to change default behavior for a given record, return `resolved` as is
// Note that it's safe to mutate `resolved` object, so you can skip copying it for performance.
conflictResolver?: SyncConflictResolver,
// experimental customization that will cause to only set records as synced if we return id.
// This will in turn cause all records to be re-pushed if id wasn't returned. This allows to
// "whitelisting" ids instead of "blacklisting" (rejectedIds) so that there is less chance that
// unpredicted error will cause data loss (when failed data push isn't re-pushed)
pushShouldConfirmOnlyAccepted?: boolean;
// commits changes in multiple batches, and not one - temporary workaround for memory issue
_unsafeBatchPerCollection?: boolean,
// Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.
Expand Down