Skip to content

Commit

Permalink
Merge pull request #1354 from undb-xyz/fix/1352
Browse files Browse the repository at this point in the history
fix: fix event partial update
  • Loading branch information
nichenqin committed Jul 21, 2023
2 parents bb5cd83 + 41d5dec commit c832205
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 18 deletions.
15 changes: 12 additions & 3 deletions packages/core/src/table/record/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { z } from 'zod'
import { Table } from '../../table.js'
import type { Table } from '../../table.js'
import { createRecordReadableValueSchema } from '../record.readable.js'
import {
EVT_RECORD_BULK_CREATED,
Expand Down Expand Up @@ -89,13 +89,22 @@ export const createRecordEventReadableValueSchema = (table: Table) => {
),
recordUpdatedEvent.merge(
z.object({
payload: recordUpdatedEventPayload.merge(z.object({ previousRecord: record, record })),
payload: recordUpdatedEventPayload.merge(
z.object({
previousRecord: record.partial(),
record: record.partial(),
}),
),
}),
),
recordDeletedEvent,
recordsBulkCreatedEvent.merge(
z.object({
payload: recordsBulkCreatedEventPayload.merge(z.object({ records: record.array() })),
payload: recordsBulkCreatedEventPayload.merge(
z.object({
records: record.partial().array(),
}),
),
}),
),
recordsBulkUpdatedEvent.merge(
Expand Down
23 changes: 16 additions & 7 deletions packages/core/src/table/record/events/record-bulk-updated.event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,30 @@ export class RecordBulkUpdatedEvent extends BaseEvent<IRecordsBulkUpdatedEventPa
operatorId: string,
previousRecords: IQueryRecordSchema[],
records: IQueryRecordSchema[],
updatedFieldIds: Map<string, Set<string>>,
): RecordBulkUpdatedEvent {
const schema = table.schema.toEvent(records)
const previousSchema = previousTable.isSome() ? previousTable.unwrap().schema.toEvent(records) : null
const fields = table.schema.fields
const recordsMap = new Map(records.map((r) => [r.id, r]))
const fieldIds = new Set([...updatedFieldIds.values()].flatMap((f) => [...f.values()]))
const schema = table.schema.fields.filter((f) => fieldIds.has(f.id.value)).map((f) => f.toEvent(records))
const previousSchema = previousTable.isSome()
? previousTable
.unwrap()
.schema.fields.filter((f) => fieldIds.has(f.id.value))
.map((f) => f.toEvent(previousRecords))
: null
return new this(
{
schema,
previousSchema,
tableId: table.id.value,
tableName: table.name.value,
updates: previousRecords.map((r) => ({
previousRecord: recordReadableMapper(fields, r),
record: recordReadableMapper(fields, recordsMap.get(r.id)!),
})),
updates: previousRecords.map((r) => {
const fields = table.schema.fields.filter((f) => !!updatedFieldIds.get(r.id)?.has(f.id.value))
return {
previousRecord: recordReadableMapper(fields, r),
record: recordReadableMapper(fields, recordsMap.get(r.id)!),
}
}),
},
operatorId,
)
Expand Down
16 changes: 12 additions & 4 deletions packages/core/src/table/record/events/record-updated.event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,23 @@ export class RecordUpdatedEvent extends BaseEvent<IRecordUpdatedEventPayload, Ba
operatorId: string,
previousRecord: IQueryRecordSchema,
record: IQueryRecordSchema,
updatedFieldIds: Set<string>,
): RecordUpdatedEvent {
const fields = table.schema.fields.filter((f) => updatedFieldIds.has(f.id.value))
const fieldIds = new Set(fields.map((f) => f.id.value))
return new this(
{
tableId: table.id.value,
tableName: table.name.value,
previousSchema: previousTable.isSome() ? previousTable.unwrap().schema.toEvent([previousRecord]) : null,
previousRecord: recordReadableMapper(table.schema.fields, previousRecord),
schema: table.schema.toEvent([record]),
record: recordReadableMapper(table.schema.fields, record),
previousSchema: previousTable.isSome()
? previousTable
.unwrap()
.schema.fields.filter((f) => fieldIds.has(f.id.value))
.map((f) => f.toEvent([previousRecord]))
: null,
previousRecord: recordReadableMapper(fields, previousRecord),
schema: table.schema.fields.filter((f) => fieldIds.has(f.id.value)).map((f) => f.toEvent([previousRecord])),
record: recordReadableMapper(fields, record),
},
operatorId,
)
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/table/record/record.readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import {
idReadableValueSchema,
jsonReadableValueSchema,
lookupReadableValueSchema,
maxReadableValueSchema,
minReadableValueSchema,
numberReadableValueSchema,
parentReadableValueSchema,
ratingReadableValueSchema,
Expand All @@ -38,8 +40,6 @@ import {
treeReadableValueSchema,
updatedAtReadableValueSchema,
updatedByReadableValueSchema,
minReadableValueSchema,
maxReadableValueSchema,
type ReferenceField,
} from '../field/index.js'
import type { Table } from '../table.js'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export class RecordSqliteMutationVisitor extends BaseEntityManager implements IR
) {
super(em)
}

public readonly updatedFieldIds = new Set<string>()

dateRangeDateEqual(s: DateRangeDateEqual): void {
throw new Error('Method not implemented.')
}
Expand Down Expand Up @@ -143,6 +146,13 @@ export class RecordSqliteMutationVisitor extends BaseEntityManager implements IR
}
values(s: WithRecordValues): void {
for (const [fieldId, value] of s.values) {
const field = this.schema.get(fieldId)
if (!field) continue

if (!field.controlled && !field.system) {
this.updatedFieldIds.add(fieldId)
}

const valueVisitor = this.createRecordValueVisitor(fieldId)

value.accept(valueVisitor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ export class RecordSqliteRepository implements IRecordRepository {
spec.accept(mv)

await mv.commit()

return mv
}

async updateOneById(table: CoreTable, id: string, spec: IRecordSpec): Promise<void> {
Expand All @@ -319,7 +321,7 @@ export class RecordSqliteRepository implements IRecordRepository {
const previousRecord = await this.findOneRecordEntity(tableId, idSpec, em)
if (!previousRecord) throw new Error('record not found')

await this._update(this.em, tableId, schema, id, spec)
const visitor = await this._update(this.em, tableId, schema, id, spec)

const record = await this.findOneRecordEntity(tableId, idSpec, em)

Expand All @@ -330,6 +332,7 @@ export class RecordSqliteRepository implements IRecordRepository {
userId,
RecordSqliteMapper.toQuery(tableId, schema, previousRecord),
RecordSqliteMapper.toQuery(tableId, schema, record),
visitor.updatedFieldIds,
)

this.outboxService.persist(event)
Expand All @@ -355,7 +358,17 @@ export class RecordSqliteRepository implements IRecordRepository {
const em = this.em

try {
await Promise.all(updates.map((update) => this._update(em, tableId, schema, update.id, update.spec)))
const updated = await Promise.all(
updates.map(async (update) => {
const visitor = await this._update(em, tableId, schema, update.id, update.spec)
return { id: update.id, visitor }
}),
)

const updatedFields = new Map()
for (const update of updated) {
updatedFields.set(update.id, update.visitor.updatedFieldIds)
}

const records = await this.findRecordsEntity(tableId, idsSpec)
const event = RecordBulkUpdatedEvent.from(
Expand All @@ -364,6 +377,7 @@ export class RecordSqliteRepository implements IRecordRepository {
userId,
RecordSqliteMapper.toQueries(tableId, schema, previousRecords),
RecordSqliteMapper.toQueries(tableId, schema, records),
updatedFields,
)
this.outboxService.persist(event)
await this.uow.commit()
Expand Down

0 comments on commit c832205

Please sign in to comment.