From 7319e926b6c57249c640b13c4c30d6a30b744f1d Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Thu, 28 Mar 2024 15:43:40 +0100 Subject: [PATCH] Add support for table initializers to gate keep pruning It's often important to avoid pruning data before the table has been fully populated by writers. Add support for this in the form of table initializers that can be registered before the application starts. Make the reconciler pruning conditional on the table being fully initialized. The initializers are stored in the table entry, which means that the initializer count is coupled to the snapshot and thus a reader won't think the table is initialized before it gets a snapshot that indeed is initialized. Example: // Adding an initializer (called before start) txn := db.WriteTxn(myTable) done := myTable.RegisterInitializer(txn) txn.Commit() rtxn1 := db.ReadTxn() myTable.Initialized(rtxn1) == false txn = db.WriteTxn(myTable) done(txn) txn.Commit() rtxn2 := db.ReadTxn() myTable.Initialized(rtxn1) == false myTable.Initialized(rtxn2) == true Signed-off-by: Jussi Maki --- reconciler/reconciler.go | 2 +- reconciler/reconciler_test.go | 20 ++++++++++++++++++++ table.go | 22 ++++++++++++++++++++++ types.go | 11 +++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index b93de76..eebf0c1 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -152,7 +152,7 @@ func (r *reconciler[Obj]) loop(ctx context.Context, health cell.Health) error { errs = append(errs, err) } - if fullReconciliation { + if fullReconciliation && r.Table.Initialized(txn) { // Time to perform a full reconciliation. An incremental reconciliation // has been performed prior to this, so the assumption is that everything // is up to date (provided incremental reconciliation did not fail). We diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index 12bf36b..5ab8e19 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -194,6 +194,15 @@ func testReconciler(t *testing.T, batchOps bool) { h.expectOp(opPrune(0)) h.expectHealthLevel(cell.StatusOK) + // Register a table initializer to prohibit pruning. + markInitialized := h.registerInitializer() + + // With table not initialized, we should not see the prune. + h.insert(ID_1, NonFaulty, reconciler.StatusPending()) + h.triggerFullReconciliation() + h.expectOp(opUpdate(ID_1)) + markInitialized() + // Add few objects and wait until incremental reconciliation is done. t.Log("Insert test objects") h.insert(ID_1, NonFaulty, reconciler.StatusPending()) @@ -455,6 +464,17 @@ const ( NonFaulty = false ) +func (h testHelper) registerInitializer() func() { + wtxn := h.db.WriteTxn(h.tbl) + done := h.tbl.RegisterInitializer(wtxn) + wtxn.Commit() + return func() { + wtxn := h.db.WriteTxn(h.tbl) + done(wtxn) + wtxn.Commit() + } +} + func (h testHelper) insert(id uint64, faulty bool, status reconciler.Status) { wtxn := h.db.WriteTxn(h.tbl) _, _, err := h.tbl.Insert(wtxn, &testObject{ diff --git a/table.go b/table.go index 39264fe..1d8906e 100644 --- a/table.go +++ b/table.go @@ -6,6 +6,7 @@ package statedb import ( "fmt" "strings" + "sync" "k8s.io/apimachinery/pkg/util/sets" @@ -167,6 +168,27 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { return t } +func (t *genTable[Obj]) Initialized(txn ReadTxn) bool { + return txn.getTxn().root[t.pos].initializers == 0 +} + +func (t *genTable[Obj]) RegisterInitializer(txn WriteTxn) func(WriteTxn) { + table := txn.getTxn().modifiedTables[t.pos] + if table != nil { + table.initializers++ + return func(txn WriteTxn) { + var once sync.Once + once.Do(func() { + if table := txn.getTxn().modifiedTables[t.pos]; table != nil { + table.initializers-- + } + }) + } + } else { + return func(WriteTxn) {} + } +} + func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { return txn.getTxn().getRevision(t) } diff --git a/types.go b/types.go index 2225015..182d133 100644 --- a/types.go +++ b/types.go @@ -31,6 +31,10 @@ type Table[Obj any] interface { // NumObjects returns the number of objects stored in the table. NumObjects(ReadTxn) int + // Initialized returns true if the registered table initializers have + // completed. + Initialized(ReadTxn) bool + // Revision of the table. Constant for a read transaction, but // increments in a write transaction on each Insert and Delete. Revision(ReadTxn) Revision @@ -79,6 +83,12 @@ type RWTable[Obj any] interface { // write transaction return the fresh uncommitted modifications if any. Table[Obj] + // RegisterInitializer adds an initializers to the table. Returns + // a function to mark the initializer done. Once all initializers are + // done, Table[*].Initialized() will return true. + // This should only be used before the application has started. + RegisterInitializer(WriteTxn) func(WriteTxn) + // ToTable returns the Table[Obj] interface. Useful with cell.Provide // to avoid the anonymous function: // @@ -335,6 +345,7 @@ type tableEntry struct { indexes []indexEntry deleteTrackers *iradix.Tree[deleteTracker] revision uint64 + initializers int // Number of table initializers pending } func (t *tableEntry) numObjects() int {