Skip to content

Commit

Permalink
Rename First to Get and Get to List/ListWatch
Browse files Browse the repository at this point in the history
The First() method name has been an eye sore from the start. This is
now a good point to rename it as we're about to switch Cilium to using
cilium/statedb and need to refactor a bunch of code anyway.

Before:

  obj, rev, found := table.First(txn, MyIndex.Query("foo"))
  obj, rev, found, watch := table.FirstWatch(txn, MyIndex.Query("foo"))

  iter, watch := table.Get(txn, MyIndex.Query("foo"))

Now:

  obj, rev, found := table.Get(txn, MyIndex.Query("foo"))
  obj, rev, found, watch := table.GetWatch(txn, MyIndex.Query("foo"))

  iter := table.List(txn, MyIndex.Query("foo"))
  iter, watch := table.ListWatch(txn, MyIndex.Query("foo"))

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed May 8, 2024
1 parent 44a4801 commit 814f9a1
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 86 deletions.
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func example() {
myObjects.Insert(wtxn, &MyObject{3, "c"})

// Modify an object
if obj, _, found := myObjects.First(wtxn, IDIndex.Query(1)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
objCopy := *obj
objCopy.Foo = "d"
myObjects.Insert(wtxn, &objCopy)
}

// Delete an object
if obj, _, found := myObjects.First(wtxn, IDIndex.Query(2)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(2)); found {
myObjects.Delete(wtxn, obj)
}

Expand All @@ -111,7 +111,7 @@ func example() {
// Query the objects with a snapshot of the database.
txn := db.ReadTxn()

if obj, _, found := myObjects.First(wtxn, IDIndex.Query(1)); found {
if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
...
}

Expand Down Expand Up @@ -314,33 +314,39 @@ var (
found bool
watch <-chan struct{}
)
// First returns the first matching object in the query.
obj, revision, found = myObjects.First(txn, IDIndex.Query(42))
// Get returns the first matching object in the query.
obj, revision, found = myObjects.Get(txn, IDIndex.Query(42))
if found {
// obj points to the object we inserted earlier.
// revision is the "table revision" for the object. Revisions are
// incremented for a table on every insertion or deletion.
}
// FirstWatch is the same as First, but also gives us a watch
// GetWatch is the same as Get, but also gives us a watch
// channel that we can use to wait on the object to appear or to
// change.
obj, revision, watch, found = myObjects.FirstWatch(txn, IDIndex.Query(42))
obj, revision, watch, found = myObjects.GetWatch(txn, IDIndex.Query(42))
<-watch // closes when object with ID '42' is inserted or deleted
```

### Iterating

`Get` can be used to iterate over all objects that match the query.
`List` can be used to iterate over all objects that match the query.

```go
var iter statedb.Iterator[*MyObject]
// Get returns all matching objects as an iterator. The iterator is lazy
// List returns all matching objects as an iterator. The iterator is lazy
// and one can stop reading at any time without worrying about the rest.
iter, watch = myObjects.Get(txn, TagsIndex.Query("hello"))
iter := myObjects.List(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
// ...
}
<-watch // closes when an object with tag "hello" is inserted or deleted

// ListWatch is like List, but also returns a watch channel.
iter, watch := myObjects.ListWatch(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() { ... }

// closes when an object with tag "hello" is inserted or deleted
<-watch
```

`Prefix` can be used to iterate over objects that match a given prefix.
Expand Down Expand Up @@ -461,7 +467,7 @@ wtxn := db.WriteTxn(myObjects)

// Now that we have the table written we can retrieve an object and none will
// be able to modify it until we commit.
obj, revision, found := myObjects.First(wtxn, IDIndex.Query(42))
obj, revision, found := myObjects.Get(wtxn, IDIndex.Query(42))
if !found { panic("it should be there, I swear!") }

// We cannot just straight up modify 'obj' since someone might be reading it.
Expand Down Expand Up @@ -498,7 +504,7 @@ txn := db.ReadTxn()

// Look up the object we want to update and perform some slow calculation
// to produce the desired new object.
obj, revision, found := myObjects.First(txn, IDIndex.Query(42))
obj, revision, found := myObjects.Get(txn, IDIndex.Query(42))
obj = veryExpensiveCalculation(obj)

// Now that we're ready to insert we can grab a WriteTxn.
Expand Down
6 changes: 3 additions & 3 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func BenchmarkDB_RandomLookup(b *testing.B) {
for j := 0; j < b.N; j++ {
txn := db.ReadTxn()
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatal("object not found")
}
Expand All @@ -319,7 +319,7 @@ func BenchmarkDB_SequentialLookup(b *testing.B) {
txn := db.ReadTxn()
for n := 0; n < b.N; n++ {
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatalf("Object not found")
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func BenchmarkDB_FullIteration_Get(b *testing.B) {
txn := db.ReadTxn()
for n := 0; n < b.N; n++ {
for _, q := range queries {
_, _, ok := table.First(txn, q)
_, _, ok := table.Get(txn, q)
if !ok {
b.Fatalf("Object not found")
}
Expand Down
66 changes: 33 additions & 33 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func TestDB_Revision(t *testing.T) {
require.Equal(t, writeRevision, readRevision, "committed transaction changed revision")
}

func TestDB_GetFirstLast(t *testing.T) {
func TestDB_GetList(t *testing.T) {
t.Parallel()

db, table, _ := newTestDB(t, tagsIndex)
Expand All @@ -597,49 +597,49 @@ func TestDB_GetFirstLast(t *testing.T) {
require.NoError(t, err)
}
// Check that we can query the not-yet-committed write transaction.
obj, rev, ok := table.First(txn, idIndex.Query(1))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(txn, idIndex.Query(1))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 1, "expected first obj.ID to equal 1")
txn.Commit()
}

txn := db.ReadTxn()

// Test Get against the ID index.
iter, _ := table.Get(txn, idIndex.Query(0))
// Test List against the ID index.
iter := table.List(txn, idIndex.Query(0))
items := Collect(iter)
require.Len(t, items, 0, "expected Get(0) to not return results")

iter, _ = table.Get(txn, idIndex.Query(1))
iter = table.List(txn, idIndex.Query(1))
items = Collect(iter)
require.Len(t, items, 1, "expected Get(1) to return result")
require.EqualValues(t, items[0].ID, 1, "expected items[0].ID to equal 1")

iter, getWatch := table.Get(txn, idIndex.Query(2))
iter, listWatch := table.ListWatch(txn, idIndex.Query(2))
items = Collect(iter)
require.Len(t, items, 1, "expected Get(2) to return result")
require.EqualValues(t, items[0].ID, 2, "expected items[0].ID to equal 2")

// Test First/FirstWatch against the ID index.
_, _, ok := table.First(txn, idIndex.Query(0))
require.False(t, ok, "expected First(0) to not return result")
// Test Get/GetWatch against the ID index.
_, _, ok := table.Get(txn, idIndex.Query(0))
require.False(t, ok, "expected Get(0) to not return result")

obj, rev, ok := table.First(txn, idIndex.Query(1))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(txn, idIndex.Query(1))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 1, "expected first obj.ID to equal 1")

obj, rev, firstWatch, ok := table.FirstWatch(txn, idIndex.Query(2))
require.True(t, ok, "expected FirstWatch(2) to return result")
obj, rev, getWatch, ok := table.GetWatch(txn, idIndex.Query(2))
require.True(t, ok, "expected GetWatch(2) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 2, "expected obj.ID to equal 2")

select {
case <-firstWatch:
t.Fatalf("FirstWatch channel closed before changes")
case <-getWatch:
t.Fatalf("Get channel closed before changes")
t.Fatalf("GetWatch channel closed before changes")
case <-listWatch:
t.Fatalf("List channel closed before changes")
default:
}

Expand All @@ -651,28 +651,28 @@ func TestDB_GetFirstLast(t *testing.T) {
wtxn.Commit()

select {
case <-firstWatch:
case <-getWatch:
case <-time.After(watchCloseTimeout):
t.Fatalf("FirstWatch channel not closed after change")
t.Fatalf("GetWatch channel not closed after change")
}
select {
case <-getWatch:
case <-listWatch:
case <-time.After(watchCloseTimeout):
t.Fatalf("Get channel not closed after change")
t.Fatalf("List channel not closed after change")
}

// Since we modified the database, grab a fresh read transaction.
txn = db.ReadTxn()

// Test First and Last against the tags multi-index which will
// Test Get and Last against the tags multi-index which will
// return multiple results.
obj, rev, _, ok = table.FirstWatch(txn, tagsIndex.Query("even"))
require.True(t, ok, "expected First(even) to return result")
obj, rev, _, ok = table.GetWatch(txn, tagsIndex.Query("even"))
require.True(t, ok, "expected Get(even) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.ElementsMatch(t, obj.Tags.Slice(), []string{"even", "modified"})
require.EqualValues(t, 2, obj.ID)

iter, _ = table.Get(txn, tagsIndex.Query("odd"))
iter = table.List(txn, tagsIndex.Query("odd"))
items = Collect(iter)
require.Len(t, items, 5, "expected Get(odd) to return 5 items")
for i, item := range items {
Expand All @@ -696,8 +696,8 @@ func TestDB_CommitAbort(t *testing.T) {
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration")

obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected First(1) to return result")
obj, rev, ok := table.Get(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected Get(1) to return result")
require.NotZero(t, rev, "expected non-zero revision")
require.EqualValues(t, obj.ID, 123, "expected obj.ID to equal 123")
require.Zero(t, obj.Tags.Len(), "expected no tags")
Expand All @@ -715,7 +715,7 @@ func TestDB_CommitAbort(t *testing.T) {

// Check that insert after commit and insert after abort do not change the
// table.
obj, newRev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
obj, newRev, ok := table.Get(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected object to exist")
require.Equal(t, rev, newRev, "expected unchanged revision")
require.EqualValues(t, obj.ID, 123, "expected obj.ID to equal 123")
Expand Down Expand Up @@ -745,7 +745,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
table.Insert(wtxn, testObject{ID: 1})
wtxn.Commit()

obj, rev1, ok := table.First(db.ReadTxn(), idIndex.Query(1))
obj, rev1, ok := table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)

// Updating an object with matching revision number works
Expand All @@ -757,7 +757,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
obj, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ := obj.Tags.All().Next()
Expand All @@ -772,7 +772,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
obj, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ = obj.Tags.All().Next()
Expand All @@ -787,7 +787,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

obj, rev2, ok := table.First(db.ReadTxn(), idIndex.Query(1))
obj, rev2, ok := table.Get(db.ReadTxn(), idIndex.Query(1))
require.True(t, ok)
require.Equal(t, 1, obj.Tags.Len())
v, _ = obj.Tags.All().Next()
Expand All @@ -802,7 +802,7 @@ func TestDB_CompareAndSwap_CompareAndDelete(t *testing.T) {
require.EqualValues(t, 1, oldObj.ID)
wtxn.Commit()

_, _, ok = table.First(db.ReadTxn(), idIndex.Query(1))
_, _, ok = table.Get(db.ReadTxn(), idIndex.Query(1))
require.False(t, ok)

// Deleting non-existing object yields not found
Expand Down
4 changes: 2 additions & 2 deletions derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type derive[In, Out any] struct {
transform func(obj In, deleted bool) (Out, DeriveResult)
}

func (d derive[In, Out]) loop(ctx context.Context, health cell.Health) error {
func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
out := d.OutTable
txn := d.DB.WriteTxn(d.InTable)
iter, err := d.InTable.Changes(txn)
Expand All @@ -85,7 +85,7 @@ func (d derive[In, Out]) loop(ctx context.Context, health cell.Health) error {
case DeriveInsert:
_, _, err = out.Insert(wtxn, outObj)
case DeriveUpdate:
_, _, found := out.First(wtxn, out.PrimaryIndexer().QueryFromObject(outObj))
_, _, found := out.Get(wtxn, out.PrimaryIndexer().QueryFromObject(outObj))
if found {
_, _, err = out.Insert(wtxn, outObj)
}
Expand Down
22 changes: 11 additions & 11 deletions fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ func allAction(ctx actionContext) {
ctx.log.log("%s: All => %d found", ctx.table.Name(), len(statedb.Collect(iter)))
}

func getAction(ctx actionContext) {
func listAction(ctx actionContext) {
value := mkValue()
iter, _ := ctx.table.Get(ctx.txn, valueIndex.Query(value))
iter := ctx.table.List(ctx.txn, valueIndex.Query(value))
ctx.log.log("%s: Get(%d)", ctx.table.Name(), value)
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
if e, ok2 := ctx.txnLog.latest[tableAndID{ctx.table.Name(), obj.id}]; ok2 {
Expand All @@ -296,25 +296,25 @@ func getAction(ctx actionContext) {
}
}

func firstAction(ctx actionContext) {
func getAction(ctx actionContext) {
id := mkID()
obj, rev, ok := ctx.table.First(ctx.txn, idIndex.Query(id))
obj, rev, ok := ctx.table.Get(ctx.txn, idIndex.Query(id))

if e, ok2 := ctx.txnLog.latest[tableAndID{ctx.table.Name(), id}]; ok2 {
if e.act == actInsert {
if !ok {
panic("First() returned not found, expected last inserted value")
panic("Get() returned not found, expected last inserted value")
}
if e.value != obj.value {
panic("First() did not return the last write")
panic("Get() did not return the last write")
}
} else if e.act == actDelete {
if ok {
panic("First() returned value even though it was deleted")
panic("Get() returned value even though it was deleted")
}
}
}
ctx.log.log("%s: First(%s) => rev=%d, ok=%v", ctx.table.Name(), id, rev, ok)
ctx.log.log("%s: Get(%s) => rev=%d, ok=%v", ctx.table.Name(), id, rev, ok)
}

func lowerboundAction(ctx actionContext) {
Expand Down Expand Up @@ -358,10 +358,10 @@ var actions = []action{
deleteAction, deleteAction, deleteAction,
deleteManyAction, deleteAllAction,

firstAction, firstAction, firstAction, firstAction, firstAction,
firstAction, firstAction, firstAction, firstAction, firstAction,
firstAction, firstAction, firstAction, firstAction, firstAction,
getAction, getAction, getAction, getAction, getAction,
getAction, getAction, getAction, getAction, getAction,
getAction, getAction, getAction, getAction, getAction,
listAction, listAction, listAction, listAction, listAction,
allAction, allAction,
lowerboundAction, lowerboundAction, lowerboundAction,
prefixAction, prefixAction, prefixAction,
Expand Down
2 changes: 1 addition & 1 deletion reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {
// Wait for all to be reconciled by waiting for the last added objects to be marked
// reconciled. This only works here since none of the operations fail.
for {
obj, _, watch, ok := testObjects.FirstWatch(db.ReadTxn(), idIndex.Query(id-1))
obj, _, watch, ok := testObjects.GetWatch(db.ReadTxn(), idIndex.Query(id-1))
if ok && obj.status.Kind == reconciler.StatusKindDone {
break
}
Expand Down
2 changes: 1 addition & 1 deletion reconciler/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func registerHTTPServer(
w.WriteHeader(http.StatusOK)

case "DELETE":
memo, _, ok := memos.First(txn, MemoNameIndex.Query(name))
memo, _, ok := memos.Get(txn, MemoNameIndex.Query(name))
if !ok {
w.WriteHeader(http.StatusNotFound)
return
Expand Down
2 changes: 1 addition & 1 deletion reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (round *incrementalRound[Obj]) processRetries() {
}
round.retries.Pop()

obj, rev, found := round.table.First(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
obj, rev, found := round.table.Get(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
if found {
status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindError {
Expand Down
Loading

0 comments on commit 814f9a1

Please sign in to comment.