Skip to content

Commit 543e1c1

Browse files
committed
Revert back to using WriteTxn as argument to Changes()
Using a ReadTxn and then optionally doing a WriteTxn is Changes() was, in hindsight, very racy. The iterator must be seeded in the same WriteTxn in which the delete tracker is registered as otherwise we may end up seeing inserted objects that were deleted and missing the delete changes. Signed-off-by: Jussi Maki <[email protected]>
1 parent cad64cb commit 543e1c1

File tree

8 files changed

+50
-51
lines changed

8 files changed

+50
-51
lines changed

benchmarks_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,13 @@ func BenchmarkDB_Changes(b *testing.B) {
295295
b.ResetTimer()
296296
for n := 0; n < b.N; n++ {
297297
// Create the change iterator.
298-
iter, err := table.Changes(db.ReadTxn())
298+
txn := db.WriteTxn(table)
299+
iter, err := table.Changes(txn)
300+
txn.Commit()
299301
require.NoError(b, err)
300302

301303
// Create objects
302-
txn := db.WriteTxn(table)
304+
txn = db.WriteTxn(table)
303305
for i := 0; i < numObjectsToInsert; i++ {
304306
_, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil})
305307
if err != nil {

db_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,11 +277,12 @@ func TestDB_EventIterator(t *testing.T) {
277277
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")
278278

279279
// Create two change iterators
280-
txn := db.ReadTxn()
281-
iter, err := table.Changes(txn)
280+
wtxn := db.WriteTxn(table)
281+
iter, err := table.Changes(wtxn)
282282
require.NoError(t, err, "failed to create ChangeIterator")
283-
iter2, err := table.Changes(txn)
283+
iter2, err := table.Changes(wtxn)
284284
require.NoError(t, err, "failed to create ChangeIterator")
285+
wtxn.Commit()
285286

286287
assert.EqualValues(t, 2, expvarInt(metrics.DeleteTrackerCountVar.Get("test")), "DeleteTrackerCount")
287288

@@ -311,7 +312,7 @@ func TestDB_EventIterator(t *testing.T) {
311312
}
312313

313314
// 1 object should exist.
314-
txn = db.ReadTxn()
315+
txn := db.ReadTxn()
315316
iterAll, _ := table.All(txn)
316317
objs := Collect(iterAll)
317318
require.Len(t, objs, 1)

derive.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ type derive[In, Out any] struct {
7070

7171
func (d derive[In, Out]) loop(ctx context.Context, health cell.Health) error {
7272
out := d.OutTable
73-
iter, err := d.InTable.Changes(d.DB.ReadTxn())
73+
txn := d.DB.WriteTxn(d.InTable)
74+
iter, err := d.InTable.Changes(txn)
75+
txn.Commit()
7476
if err != nil {
7577
return err
7678
}

fuzz_test.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -371,8 +371,9 @@ func randomAction() action {
371371

372372
func trackerWorker(i int, stop <-chan struct{}) {
373373
log := newDebugLogger(900 + i)
374-
txn := fuzzDB.ReadTxn()
375-
iter, err := tableFuzz1.Changes(txn)
374+
wtxn := fuzzDB.WriteTxn(tableFuzz1)
375+
iter, err := tableFuzz1.Changes(wtxn)
376+
wtxn.Commit()
376377
if err != nil {
377378
panic(err)
378379
}
@@ -381,6 +382,7 @@ func trackerWorker(i int, stop <-chan struct{}) {
381382
// Keep track of what state the changes lead us to in order to validate it.
382383
state := map[uint64]*statedb.Change[fuzzObj]{}
383384

385+
var txn statedb.ReadTxn
384386
var prevRev statedb.Revision
385387
for {
386388
for change, rev, ok := iter.Next(); ok; change, rev, ok = iter.Next() {
@@ -406,31 +408,34 @@ func trackerWorker(i int, stop <-chan struct{}) {
406408
state[change.Object.id] = &change
407409
}
408410
}
409-
// Validate that the observed changes match with the database state at this
410-
// snapshot.
411-
state2 := maps.Clone(state)
412-
iterAll, _ := tableFuzz1.All(txn)
413-
for obj, rev, ok := iterAll.Next(); ok; obj, rev, ok = iterAll.Next() {
414-
change, found := state[obj.id]
415-
if !found {
416-
panic(fmt.Sprintf("trackerWorker: object %d not found from state", obj.id))
417-
}
418411

419-
if change.Revision != rev {
420-
panic(fmt.Sprintf("trackerWorker: last observed revision %d does not match real revision %d", change.Revision, rev))
421-
}
412+
if txn != nil {
413+
// Validate that the observed changes match with the database state at this
414+
// snapshot.
415+
state2 := maps.Clone(state)
416+
iterAll, _ := tableFuzz1.All(txn)
417+
for obj, rev, ok := iterAll.Next(); ok; obj, rev, ok = iterAll.Next() {
418+
change, found := state[obj.id]
419+
if !found {
420+
panic(fmt.Sprintf("trackerWorker: object %d not found from state", obj.id))
421+
}
422+
423+
if change.Revision != rev {
424+
panic(fmt.Sprintf("trackerWorker: last observed revision %d does not match real revision %d", change.Revision, rev))
425+
}
422426

423-
if change.Object.value != obj.value {
424-
panic(fmt.Sprintf("trackerWorker: observed value %d does not match real value %d", change.Object.value, obj.value))
427+
if change.Object.value != obj.value {
428+
panic(fmt.Sprintf("trackerWorker: observed value %d does not match real value %d", change.Object.value, obj.value))
429+
}
430+
delete(state2, obj.id)
425431
}
426-
delete(state2, obj.id)
427-
}
428432

429-
if len(state) != tableFuzz1.NumObjects(txn) {
430-
for id := range state2 {
431-
fmt.Printf("%d should not exist\n", id)
433+
if len(state) != tableFuzz1.NumObjects(txn) {
434+
for id := range state2 {
435+
fmt.Printf("%d should not exist\n", id)
436+
}
437+
panic(fmt.Sprintf("trackerWorker: state size mismatch: %d vs %d", len(state), tableFuzz1.NumObjects(txn)))
432438
}
433-
panic(fmt.Sprintf("trackerWorker: state size mismatch: %d vs %d", len(state), tableFuzz1.NumObjects(txn)))
434439
}
435440

436441
txn = fuzzDB.ReadTxn()
@@ -524,7 +529,7 @@ func TestDB_Fuzz(t *testing.T) {
524529
}()
525530
// Delay a bit to start the trackers at different points in time
526531
// so they will observe a different starting state.
527-
time.Sleep(100 * time.Millisecond)
532+
time.Sleep(500 * time.Millisecond)
528533
}
529534

530535
// Wait until the mutation workers stop and then stop

observable.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ type observable[Obj any] struct {
2525

2626
func (to *observable[Obj]) Observe(ctx context.Context, next func(Change[Obj]), complete func(error)) {
2727
go func() {
28-
iter, err := to.table.Changes(to.db.ReadTxn())
28+
txn := to.db.WriteTxn(to.table)
29+
iter, err := to.table.Changes(txn)
30+
txn.Commit()
2931
if err != nil {
3032
complete(err)
3133
return

table.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -318,30 +318,13 @@ func (t *genTable[Obj]) DeleteAll(txn WriteTxn) error {
318318
return nil
319319
}
320320

321-
func (t *genTable[Obj]) Changes(txn ReadTxn) (ChangeIterator[Obj], error) {
321+
func (t *genTable[Obj]) Changes(txn WriteTxn) (ChangeIterator[Obj], error) {
322322
iter := &changeIterator[Obj]{
323323
revision: 0,
324324
table: t,
325325
}
326326

327-
// Check that 'txn' is not a 'WriteTxn' against this table.
328-
for _, e := range txn.getTxn().modifiedTables {
329-
if e != nil && e.meta.Name() == t.table {
330-
// 'txn' is a WriteTxn against the same table. Refuse this as the snapshot
331-
// to seed the initial iterator from.
332-
//
333-
// The user might use the same transaction to insert or delete objects
334-
// and it might be confusing as these changes may or may not be observed
335-
// depending on whether Changes() was called before Insert() or Delete(),
336-
// and since this is an unexpected use-case it's better to just reject this.
337-
return nil, fmt.Errorf("Changes() cannot be called with a write transaction against table %q", t.table)
338-
}
339-
}
340-
341-
// Create a WriteTxn to add the delete tracker.
342-
itxn := txn.getTxn().db.WriteTxn(t).getTxn()
343-
defer itxn.Commit()
344-
327+
itxn := txn.getTxn()
345328
name := fmt.Sprintf("iterator-%p", iter)
346329
iter.dt = &deleteTracker[Obj]{
347330
db: itxn.db,

txn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt anyDelet
242242
return ErrTransactionClosed
243243
}
244244
table := txn.modifiedTables[meta.tablePos()]
245+
if table == nil {
246+
return tableError(meta.Name(), ErrTableNotLockedForWriting)
247+
}
248+
245249
_, _, table.deleteTrackers = table.deleteTrackers.Insert([]byte(trackerName), dt)
246250
txn.db.metrics.DeleteTrackerCount(meta.Name(), table.deleteTrackers.Len())
247251

types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type Table[Obj any] interface {
7171
//
7272
// If an object is created and deleted before the observer has iterated
7373
// over the creation then only the deletion is seen.
74-
Changes(txn ReadTxn) (ChangeIterator[Obj], error)
74+
Changes(WriteTxn) (ChangeIterator[Obj], error)
7575
}
7676

7777
// Change is either an update or a delete of an object. Used by Changes() and

0 commit comments

Comments
 (0)