Skip to content

Commit

Permalink
Add support for table initializers to gate keep pruning
Browse files Browse the repository at this point in the history
It's often important to avoid pruning data before the table has
been fully populated by writers. Add support for this in the form
of table initializers that can be registered before the application
starts. Make the reconciler pruning conditional on the table being
fully initialized.

The initializers are stored in the table entry, which means that
the initializer count is coupled to the snapshot and thus a reader
won't think the table is initialized before it gets a snapshot that
indeed is initialized.

Example:

  // Adding an initializer (called before start)
  txn := db.WriteTxn(myTable)
  done := myTable.RegisterInitializer(txn)
  txn.Commit()

  rtxn1 := db.ReadTxn()
  myTable.Initialized(rtxn1) == false

  txn = db.WriteTxn(myTable)
  done(txn)
  txn.Commit()

  rtxn2 := db.ReadTxn()
  myTable.Initialized(rtxn1) == false
  myTable.Initialized(rtxn2) == true

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Apr 2, 2024
1 parent c8b09a3 commit 7319e92
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 1 deletion.
2 changes: 1 addition & 1 deletion reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (r *reconciler[Obj]) loop(ctx context.Context, health cell.Health) error {
errs = append(errs, err)
}

if fullReconciliation {
if fullReconciliation && r.Table.Initialized(txn) {
// Time to perform a full reconciliation. An incremental reconciliation
// has been performed prior to this, so the assumption is that everything
// is up to date (provided incremental reconciliation did not fail). We
Expand Down
20 changes: 20 additions & 0 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func testReconciler(t *testing.T, batchOps bool) {
h.expectOp(opPrune(0))
h.expectHealthLevel(cell.StatusOK)

// Register a table initializer to prohibit pruning.
markInitialized := h.registerInitializer()

// With table not initialized, we should not see the prune.
h.insert(ID_1, NonFaulty, reconciler.StatusPending())
h.triggerFullReconciliation()
h.expectOp(opUpdate(ID_1))
markInitialized()

// Add few objects and wait until incremental reconciliation is done.
t.Log("Insert test objects")
h.insert(ID_1, NonFaulty, reconciler.StatusPending())
Expand Down Expand Up @@ -455,6 +464,17 @@ const (
NonFaulty = false
)

func (h testHelper) registerInitializer() func() {
wtxn := h.db.WriteTxn(h.tbl)
done := h.tbl.RegisterInitializer(wtxn)
wtxn.Commit()
return func() {
wtxn := h.db.WriteTxn(h.tbl)
done(wtxn)
wtxn.Commit()
}
}

func (h testHelper) insert(id uint64, faulty bool, status reconciler.Status) {
wtxn := h.db.WriteTxn(h.tbl)
_, _, err := h.tbl.Insert(wtxn, &testObject{
Expand Down
22 changes: 22 additions & 0 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package statedb
import (
"fmt"
"strings"
"sync"

"k8s.io/apimachinery/pkg/util/sets"

Expand Down Expand Up @@ -167,6 +168,27 @@ func (t *genTable[Obj]) ToTable() Table[Obj] {
return t
}

func (t *genTable[Obj]) Initialized(txn ReadTxn) bool {
return txn.getTxn().root[t.pos].initializers == 0
}

func (t *genTable[Obj]) RegisterInitializer(txn WriteTxn) func(WriteTxn) {
table := txn.getTxn().modifiedTables[t.pos]
if table != nil {
table.initializers++
return func(txn WriteTxn) {
var once sync.Once
once.Do(func() {
if table := txn.getTxn().modifiedTables[t.pos]; table != nil {
table.initializers--
}
})
}
} else {
return func(WriteTxn) {}
}
}

func (t *genTable[Obj]) Revision(txn ReadTxn) Revision {
return txn.getTxn().getRevision(t)
}
Expand Down
11 changes: 11 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Table[Obj any] interface {
// NumObjects returns the number of objects stored in the table.
NumObjects(ReadTxn) int

// Initialized returns true if the registered table initializers have
// completed.
Initialized(ReadTxn) bool

// Revision of the table. Constant for a read transaction, but
// increments in a write transaction on each Insert and Delete.
Revision(ReadTxn) Revision
Expand Down Expand Up @@ -79,6 +83,12 @@ type RWTable[Obj any] interface {
// write transaction return the fresh uncommitted modifications if any.
Table[Obj]

// RegisterInitializer adds an initializers to the table. Returns
// a function to mark the initializer done. Once all initializers are
// done, Table[*].Initialized() will return true.
// This should only be used before the application has started.
RegisterInitializer(WriteTxn) func(WriteTxn)

// ToTable returns the Table[Obj] interface. Useful with cell.Provide
// to avoid the anonymous function:
//
Expand Down Expand Up @@ -335,6 +345,7 @@ type tableEntry struct {
indexes []indexEntry
deleteTrackers *iradix.Tree[deleteTracker]
revision uint64
initializers int // Number of table initializers pending
}

func (t *tableEntry) numObjects() int {
Expand Down

0 comments on commit 7319e92

Please sign in to comment.