Skip to content

Commit

Permalink
Introduce Changes() method
Browse files Browse the repository at this point in the history
DeleteTracker() is confusing to users. It is essentially just providing
an iterator for updates and deletes, so let's call it that: Changes().

Functional difference to before is that this does not allow "reiterating"
as one could do with DeleteTracker's IterateWithError that stops on error
and allows picking up iteration again to retry. But that's not needed with
Changes() as the caller can take care of retrying the failed event before
iterating again.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Apr 17, 2024
1 parent c7b7b15 commit 7bac765
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 236 deletions.
24 changes: 12 additions & 12 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func BenchmarkDB_Baseline_Hashmap_Lookup(b *testing.B) {
b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_DeleteTracker_Baseline(b *testing.B) {
func BenchmarkDB_Changes_Baseline(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand All @@ -290,13 +290,13 @@ func BenchmarkDB_DeleteTracker_Baseline(b *testing.B) {
b.ReportMetric(float64(b.N*numObjectsToInsert)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_DeleteTracker(b *testing.B) {
func BenchmarkDB_Changes(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{})
b.ResetTimer()
for n := 0; n < b.N; n++ {
// Create objects
txn := db.WriteTxn(table)
dt, err := table.DeleteTracker(txn, "test")
iter, err := table.Changes(txn)
require.NoError(b, err)
for i := 0; i < numObjectsToInsert; i++ {
_, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil})
Expand All @@ -313,16 +313,16 @@ func BenchmarkDB_DeleteTracker(b *testing.B) {

// Iterate over the deleted objects
nDeleted := 0
dt.Iterate(
db.ReadTxn(),
func(obj testObject, deleted bool, _ Revision) {
if !deleted {
b.Fatalf("expected deleted for %v", obj)
}
<-iter.Watch(db.ReadTxn())
for ev, _, ok := iter.Next(); ok; ev, _, ok = iter.Next() {
if ev.Deleted {
nDeleted++
})
require.EqualValues(b, numObjectsToInsert, nDeleted)
dt.Close()
} else {
b.Fatalf("expected deleted for %v", ev)
}
}
require.EqualValues(b, nDeleted, numObjectsToInsert)
iter.Close()
}
b.StopTimer()
eventuallyGraveyardIsEmpty(b, db)
Expand Down
166 changes: 104 additions & 62 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package statedb
import (
"bytes"
"context"
"errors"
"expvar"
"fmt"
"log/slog"
Expand Down Expand Up @@ -260,7 +259,7 @@ func TestDB_Prefix(t *testing.T) {
require.Equal(t, Collect(Map(iter, testObject.getID)), []uint64{71, 82, 99})
}

func TestDB_DeleteTracker(t *testing.T) {
func TestDB_EventIterator(t *testing.T) {
t.Parallel()

db, table, metrics := newTestDB(t, tagsIndex)
Expand All @@ -277,13 +276,12 @@ func TestDB_DeleteTracker(t *testing.T) {
assert.EqualValues(t, 3, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// Create two delete trackers
wtxn := db.WriteTxn(table)
deleteTracker, err := table.DeleteTracker(wtxn, "test")
require.NoError(t, err, "failed to create DeleteTracker")
deleteTracker2, err := table.DeleteTracker(wtxn, "test2")
require.NoError(t, err, "failed to create DeleteTracker")
wtxn.Commit()
// Create two change iterators
txn := db.ReadTxn()
iter, err := table.Changes(txn)
require.NoError(t, err, "failed to create ChangeIterator")
iter2, err := table.Changes(txn)
require.NoError(t, err, "failed to create ChangeIterator")

assert.EqualValues(t, 2, expvarInt(metrics.DeleteTrackerCountVar.Get("test")), "DeleteTrackerCount")

Expand Down Expand Up @@ -313,9 +311,9 @@ func TestDB_DeleteTracker(t *testing.T) {
}

// 1 object should exist.
txn := db.ReadTxn()
iter, _ := table.All(txn)
objs := Collect(iter)
txn = db.ReadTxn()
iterAll, _ := table.All(txn)
objs := Collect(iterAll)
require.Len(t, objs, 1)

assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
Expand All @@ -324,57 +322,59 @@ func TestDB_DeleteTracker(t *testing.T) {
// Consume the deletions using the first delete tracker.
nExist := 0
nDeleted := 0
_, err = deleteTracker.IterateWithError(
txn,
func(obj testObject, deleted bool, _ Revision) error {
if deleted {
nDeleted++
} else {
nExist++
}
return nil
})
require.NoError(t, err)
require.Equal(t, nDeleted, 2)
require.Equal(t, nExist, 1)

// Since the second delete tracker has not processed the deletions,
// Observe the objects that existed when the tracker was created.
for ev, _, ok := iter.Next(); ok; ev, _, ok = iter.Next() {
if ev.Deleted {
nDeleted++
} else {
nExist++
}
}
assert.Equal(t, 0, nDeleted)
assert.Equal(t, 3, nExist)

// Wait for the new changes.
<-iter.Watch(txn)
for ev, _, ok := iter.Next(); ok; ev, _, ok = iter.Next() {
if ev.Deleted {
nDeleted++
nExist--
} else {
nExist++
}
}
assert.Equal(t, 2, nDeleted)
assert.Equal(t, 1, nExist)

// Since the second iterator has not processed the deletions,
// the graveyard index should still hold them.
require.False(t, db.graveyardIsEmpty())

// Consume the deletions using the second delete tracker, but
// with a failure first.
// Consume the deletions using the second iterator.
nExist = 0
nDeleted = 0
failErr := errors.New("fail")
_, err = deleteTracker2.IterateWithError(
txn,
func(obj testObject, deleted bool, _ Revision) error {
if deleted {
nDeleted++
return failErr
}

for ev, _, ok := iter2.Next(); ok; ev, _, ok = iter2.Next() {
if ev.Deleted {
nDeleted++
} else {
nExist++
return nil
})
require.ErrorIs(t, err, failErr)
require.Equal(t, nExist, 1) // Existing objects are iterated first.
require.Equal(t, nDeleted, 1)
nExist = 0
nDeleted = 0
}
}
<-iter2.Watch(txn)

// Process again, but this time using Iterate (retrying the failed revision)
_ = deleteTracker2.Iterate(
txn,
func(obj testObject, deleted bool, _ Revision) {
if deleted {
nDeleted++
} else {
nExist++
}
})
require.Equal(t, nDeleted, 2)
require.Equal(t, nExist, 0) // This was already processed.
for ev, _, ok := iter2.Next(); ok; ev, _, ok = iter2.Next() {
if ev.Deleted {
nDeleted++
nExist--
} else {
nExist++
}
}

assert.Equal(t, 1, nExist)
assert.Equal(t, 2, nDeleted)

// Graveyard will now be GCd.
eventuallyGraveyardIsEmpty(t, db)
Expand All @@ -383,9 +383,37 @@ func TestDB_DeleteTracker(t *testing.T) {
assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// After closing the first delete tracker, deletes are still tracked for second one.
// Delete the last remaining object.
deleteTracker.Close()
// Insert a new object and consume the event
{
wtxn := db.WriteTxn(table)
table.Insert(wtxn, testObject{ID: 88, Tags: []string{"foo"}})
wtxn.Commit()
}

txn = db.ReadTxn()
<-iter.Watch(txn)
<-iter2.Watch(txn)

ev, _, ok := iter.Next()
assert.True(t, ok)
assert.EqualValues(t, 88, ev.Object.ID)
assert.False(t, ev.Deleted)

ev, _, ok = iter2.Next()
assert.True(t, ok)
assert.EqualValues(t, 88, ev.Object.ID)
assert.False(t, ev.Deleted)

ev, _, ok = iter.Next()
assert.False(t, ok)

ev, _, ok = iter2.Next()
assert.False(t, ok)

// After closing the first iterator, deletes are still tracked for second one.
// Delete the remaining objects.
iter.Close()

{
txn := db.WriteTxn(table)
table.DeleteAll(txn)
Expand All @@ -394,16 +422,30 @@ func TestDB_DeleteTracker(t *testing.T) {
require.False(t, db.graveyardIsEmpty())

assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 1, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")
assert.EqualValues(t, 2, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

// Consume the deletions using the second iterator.
txn = db.ReadTxn()
<-iter2.Watch(txn)

ev, _, ok = iter2.Next()
assert.True(t, ok)
assert.True(t, ev.Deleted)

ev, _, ok = iter2.Next()
assert.True(t, ok)
assert.True(t, ev.Deleted)

ev, _, ok = iter2.Next()
assert.False(t, ok)

// And finally after closing the second tracker deletions are no longer tracked.
deleteTracker2.Mark(table.Revision(db.ReadTxn()))
eventuallyGraveyardIsEmpty(t, db)

assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount")

deleteTracker2.Close()
// After closing the second iterator the deletions no longer go into graveyard.
iter2.Close()
{
txn := db.WriteTxn(table)
table.Insert(txn, testObject{ID: 78, Tags: []string{"world"}})
Expand Down
64 changes: 6 additions & 58 deletions deletetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/cilium/statedb/index"
)

type DeleteTracker[Obj any] struct {
type deleteTracker[Obj any] struct {
db *DB
trackerName string
table Table[Obj]
Expand All @@ -22,28 +22,28 @@ type DeleteTracker[Obj any] struct {

// setRevision is called to set the starting low watermark when
// this deletion tracker is inserted into the table.
func (dt *DeleteTracker[Obj]) setRevision(rev uint64) {
func (dt *deleteTracker[Obj]) setRevision(rev uint64) {
dt.revision.Store(rev)
}

// getRevision is called by the graveyard garbage collector to
// compute the global low watermark.
func (dt *DeleteTracker[Obj]) getRevision() uint64 {
func (dt *deleteTracker[Obj]) getRevision() uint64 {
return dt.revision.Load()
}

// Deleted returns an iterator for deleted objects in this table starting from
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
func (dt *deleteTracker[Obj]) deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos)
iter := indexTxn.LowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
}

// Mark the revision up to which deleted objects have been processed. This sets
// the low watermark for deleted object garbage collection.
func (dt *DeleteTracker[Obj]) Mark(upTo Revision) {
func (dt *deleteTracker[Obj]) mark(upTo Revision) {
// Store the new low watermark and trigger a round of garbage collection.
dt.revision.Store(upTo)
select {
Expand All @@ -52,7 +52,7 @@ func (dt *DeleteTracker[Obj]) Mark(upTo Revision) {
}
}

func (dt *DeleteTracker[Obj]) Close() {
func (dt *deleteTracker[Obj]) close() {
// Remove the delete tracker from the table.
txn := dt.db.WriteTxn(dt.table).getTxn()
db := txn.db
Expand All @@ -71,58 +71,6 @@ func (dt *DeleteTracker[Obj]) Close() {
case db.gcTrigger <- struct{}{}:
default:
}

}

// IterateWithError iterates updates and deletes to a table in revision order.
//
// The 'processFn' is called for each updated or deleted object in order. If an error
// is returned by the function the iteration is stopped and the error is returned.
// On further calls the processing continues from the next unprocessed (or error'd) revision.
func (dt *DeleteTracker[Obj]) IterateWithError(txn ReadTxn, processFn func(obj Obj, deleted bool, rev Revision) error) (<-chan struct{}, error) {
upTo := dt.table.Revision(txn)
lastRevision := dt.revision.Load()

// Get all new and updated objects with revision number equal or
// higher than 'minRevision'.
// The returned watch channel watches the whole table and thus
// is closed when either insert or delete happens.
updatedIter, watch := dt.table.LowerBound(txn, ByRevision[Obj](lastRevision+1))

// Get deleted objects with revision equal or higher than 'minRevision'.
deletedIter := dt.Deleted(txn.getTxn(), lastRevision+1)

// Combine the iterators into one. This can be done as insert and delete
// both assign the object a new fresh monotonically increasing revision
// number.
iter := NewDualIterator[Obj](deletedIter, updatedIter)

for obj, rev, isDeleted, ok := iter.Next(); ok; obj, rev, isDeleted, ok = iter.Next() {
err := processFn(obj, isDeleted, rev)
if err != nil {
// Mark deleted objects processed up to previous revision since we may
// not have processed all objects with this revision fully yet.
dt.Mark(rev - 1)

// Processing failed, stop here and try again from this same revision.
return closedWatchChannel, err
}

}

// Fully processed up to latest table revision. GC deleted objects
// and return the next revision.
dt.Mark(upTo)
return watch, nil
}

// Iterate over updated and deleted objects in revision order.
func (dt *DeleteTracker[Obj]) Iterate(txn ReadTxn, iterateFn func(obj Obj, deleted bool, rev Revision)) <-chan struct{} {
watch, _ := dt.IterateWithError(txn, func(obj Obj, deleted bool, rev Revision) error {
iterateFn(obj, deleted, rev)
return nil
})
return watch
}

var closedWatchChannel = func() <-chan struct{} {
Expand Down
Loading

0 comments on commit 7bac765

Please sign in to comment.