diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index b93de76..eebf0c1 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -152,7 +152,7 @@ func (r *reconciler[Obj]) loop(ctx context.Context, health cell.Health) error { errs = append(errs, err) } - if fullReconciliation { + if fullReconciliation && r.Table.Initialized(txn) { // Time to perform a full reconciliation. An incremental reconciliation // has been performed prior to this, so the assumption is that everything // is up to date (provided incremental reconciliation did not fail). We diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index 12bf36b..5ab8e19 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -194,6 +194,15 @@ func testReconciler(t *testing.T, batchOps bool) { h.expectOp(opPrune(0)) h.expectHealthLevel(cell.StatusOK) + // Register a table initializer to prohibit pruning. + markInitialized := h.registerInitializer() + + // With table not initialized, we should not see the prune. + h.insert(ID_1, NonFaulty, reconciler.StatusPending()) + h.triggerFullReconciliation() + h.expectOp(opUpdate(ID_1)) + markInitialized() + // Add few objects and wait until incremental reconciliation is done. t.Log("Insert test objects") h.insert(ID_1, NonFaulty, reconciler.StatusPending()) @@ -455,6 +464,17 @@ const ( NonFaulty = false ) +func (h testHelper) registerInitializer() func() { + wtxn := h.db.WriteTxn(h.tbl) + done := h.tbl.RegisterInitializer(wtxn) + wtxn.Commit() + return func() { + wtxn := h.db.WriteTxn(h.tbl) + done(wtxn) + wtxn.Commit() + } +} + func (h testHelper) insert(id uint64, faulty bool, status reconciler.Status) { wtxn := h.db.WriteTxn(h.tbl) _, _, err := h.tbl.Insert(wtxn, &testObject{ diff --git a/table.go b/table.go index 39264fe..1d8906e 100644 --- a/table.go +++ b/table.go @@ -6,6 +6,7 @@ package statedb import ( "fmt" "strings" + "sync" "k8s.io/apimachinery/pkg/util/sets" @@ -167,6 +168,27 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { return t } +func (t *genTable[Obj]) Initialized(txn ReadTxn) bool { + return txn.getTxn().root[t.pos].initializers == 0 +} + +func (t *genTable[Obj]) RegisterInitializer(txn WriteTxn) func(WriteTxn) { + table := txn.getTxn().modifiedTables[t.pos] + if table != nil { + table.initializers++ + return func(txn WriteTxn) { + var once sync.Once + once.Do(func() { + if table := txn.getTxn().modifiedTables[t.pos]; table != nil { + table.initializers-- + } + }) + } + } else { + return func(WriteTxn) {} + } +} + func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { return txn.getTxn().getRevision(t) } diff --git a/types.go b/types.go index 2225015..182d133 100644 --- a/types.go +++ b/types.go @@ -31,6 +31,10 @@ type Table[Obj any] interface { // NumObjects returns the number of objects stored in the table. NumObjects(ReadTxn) int + // Initialized returns true if the registered table initializers have + // completed. + Initialized(ReadTxn) bool + // Revision of the table. Constant for a read transaction, but // increments in a write transaction on each Insert and Delete. Revision(ReadTxn) Revision @@ -79,6 +83,12 @@ type RWTable[Obj any] interface { // write transaction return the fresh uncommitted modifications if any. Table[Obj] + // RegisterInitializer adds an initializers to the table. Returns + // a function to mark the initializer done. Once all initializers are + // done, Table[*].Initialized() will return true. + // This should only be used before the application has started. + RegisterInitializer(WriteTxn) func(WriteTxn) + // ToTable returns the Table[Obj] interface. Useful with cell.Provide // to avoid the anonymous function: // @@ -335,6 +345,7 @@ type tableEntry struct { indexes []indexEntry deleteTrackers *iradix.Tree[deleteTracker] revision uint64 + initializers int // Number of table initializers pending } func (t *tableEntry) numObjects() int {