Skip to content

Commit

Permalink
reconciler: Remove StatusKindDelete
Browse files Browse the repository at this point in the history
To be able to use multiple reconcilers against a single object we cannot
do soft-deletes and have the reconciler delete the object, as then we would
have multiple reconcilers wanting to delete the same object.

To fix this, drop the StatusKindDelete and use the ChangeIterator to iterate
over both inserts and deletes.

The downside is that we can't observe failed deletions via the table, but it
will be visible via module health. On the other hand, it is less confusing
to users to not have deleted objects visible in the tables.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed May 1, 2024
1 parent 8d8d9d5 commit 90fe98b
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 191 deletions.
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (it *changeIterator[Obj]) Watch(txn ReadTxn) <-chan struct{} {

updateIter, watch := it.table.LowerBound(txn, ByRevision[Obj](it.revision+1))
deleteIter := it.dt.deleted(txn, it.revision+1)
it.iter = NewDualIterator[Obj](deleteIter, updateIter)
it.iter = NewDualIterator(deleteIter, updateIter)

// It is enough to watch the revision index and not the graveyard since
// any object that is inserted into the graveyard will be deleted from
Expand Down
18 changes: 17 additions & 1 deletion reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"

"github.com/cilium/hive"
Expand All @@ -28,7 +29,7 @@ var logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
var numObjects = flag.Int("objects", 100000, "number of objects to create")
var numObjects = flag.Int("objects", 1000000, "number of objects to create")
var batchSize = flag.Int("batchsize", 1000, "batch size for writes")
var incrBatchSize = flag.Int("incrbatchsize", 1000, "maximum batch size for incremental reconciliation")
var quiet = flag.Bool("quiet", false, "quiet output for CI")
Expand All @@ -53,6 +54,7 @@ func (t *testObject) Clone() *testObject {
}

type mockOps struct {
numUpdates atomic.Int32
}

// Delete implements reconciler.Operations.
Expand All @@ -67,6 +69,7 @@ func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb.

// Update implements reconciler.Operations.
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject, changed *bool) error {
mt.numUpdates.Add(1)
if changed != nil {
*changed = true
}
Expand Down Expand Up @@ -198,6 +201,19 @@ func main() {
timePerObject := float64(duration) / float64(*numObjects)
objsPerSecond := float64(time.Second) / timePerObject

// Check that all objects were updated.
if mt.numUpdates.Load() != int32(*numObjects) {
log.Fatalf("expected %d updates, but only saw %d", *numObjects, mt.numUpdates.Load())
}

// Check that all statuses are correctly set.
iter, _ := testObjects.All(db.ReadTxn())
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
if obj.status.Kind != reconciler.StatusKindDone {
log.Fatalf("Object with unexpected status: %#v", obj)
}
}

if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions reconciler/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ func registerHTTPServer(
w.WriteHeader(http.StatusNotFound)
return
}
memo = memo.Clone().SetStatus(reconciler.StatusPendingDelete())
memos.Insert(txn, memo)
memos.Delete(txn, memo)
log.Info("Deleted memo", "name", name)
w.WriteHeader(http.StatusOK)
}
Expand Down
11 changes: 3 additions & 8 deletions reconciler/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// full performs full reconciliation of all objects. First the Prune() operations is performed to clean up and then
// Update() is called for each object. Full reconciliation is used to recover from unexpected outside modifications.
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) (statedb.Revision, error) {
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error {
var errs []error
outOfSync := false
ops := r.Config.Operations
Expand Down Expand Up @@ -42,7 +42,7 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) (stated
updateResults[obj] = opResult{rev: rev, status: StatusDone()}
r.retries.Clear(obj)
} else {
updateResults[obj] = opResult{rev: rev, status: StatusError(false, err)}
updateResults[obj] = opResult{rev: rev, status: StatusError(err)}
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -70,11 +70,6 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) (stated
}

r.metrics.FullReconciliationErrors(r.ModuleID, errs)
if len(errs) > 0 {
return r.Config.Table.Revision(txn), fmt.Errorf("full: %w", joinErrors(errs))
}

// Sync succeeded up to latest revision. Continue incremental reconciliation from
// this revision.
return r.Config.Table.Revision(txn), nil
return errs
}
2 changes: 1 addition & 1 deletion reconciler/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func omittedError(n int) error {

func joinErrors(errs []error) error {
if len(errs) > maxJoinedErrors {
errs = append(slices.Clone(errs)[:maxJoinedErrors], omittedError(len(errs)))
errs = append(slices.Clone(errs)[:maxJoinedErrors], omittedError(len(errs)-maxJoinedErrors))
}
return errors.Join(errs...)
}
127 changes: 37 additions & 90 deletions reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package reconciler

import (
"context"
"fmt"
"time"

"github.com/cilium/hive/cell"
Expand All @@ -23,7 +22,6 @@ type incrementalRound[Obj comparable] struct {
ctx context.Context
txn statedb.ReadTxn
table statedb.RWTable[Obj]
oldRevision statedb.Revision

// numReconciled counts the number of objects that have been reconciled in this
// round, both for new & changed objects and for retried objects. If
Expand All @@ -32,7 +30,7 @@ type incrementalRound[Obj comparable] struct {
// reconciliation per object is slow.
numReconciled int

// results collects the results of update and delete operations.
// results collects the results of update operations.
// The results are committed in a separate write transaction in order to
// not lock the table while reconciling. If an object has changed in the meanwhile
// the stale reconciliation result for that object is dropped.
Expand All @@ -41,15 +39,14 @@ type incrementalRound[Obj comparable] struct {
errs []error
}

func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn, rev statedb.Revision) (statedb.Revision, <-chan struct{}, error) {
func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn, changes statedb.ChangeIterator[Obj]) []error {
round := incrementalRound[Obj]{
moduleID: r.ModuleID,
metrics: r.metrics,
config: &r.Config,
retries: r.retries,
primaryIndexer: r.primaryIndexer,
db: r.DB,
oldRevision: rev,
ctx: ctx,
txn: txn,
table: r.Config.Table,
Expand All @@ -58,42 +55,30 @@ func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn,

// Reconcile new and changed objects using either Operations
// or BatchOperations.
var newRevision statedb.Revision
if r.Config.BatchOperations != nil {
newRevision = round.batch()
round.batch(changes)
} else {
newRevision = round.single()
round.single(changes)
}

// Process objects that need to be retried that were not cleared.
round.processRetries()

// Finally commit the status updates.
watch := round.commitStatus()

if round.numReconciled >= r.Config.IncrementalRoundSize {
// Round size limit was hit, use a closed watch channel to retrigger
// incremental reconciliation immediately.
watch = closedWatchChannel
}
round.commitStatus()

r.metrics.IncrementalReconciliationErrors(r.ModuleID, round.errs)

if len(round.errs) > 0 {
return newRevision, watch, fmt.Errorf("incremental: %w", joinErrors(round.errs))
}
return newRevision, watch, nil
return round.errs
}

func (round *incrementalRound[Obj]) single() statedb.Revision {
func (round *incrementalRound[Obj]) single(changes statedb.ChangeIterator[Obj]) {
// Iterate in revision order through new and changed objects.
newRevision := round.oldRevision
iter, _ := round.table.LowerBound(round.txn, statedb.ByRevision[Obj](round.oldRevision+1))
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
newRevision = rev
for change, _, ok := changes.Next(); ok; change, _, ok = changes.Next() {
obj := change.Object

status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindPending {
if !change.Deleted && status.Kind != StatusKindPending {
// Only process objects that are pending reconciliation, e.g.
// changed from outside.
// Failures (e.g. StatusKindError) are processed via the retry queue.
Expand All @@ -103,32 +88,27 @@ func (round *incrementalRound[Obj]) single() statedb.Revision {
// Clear retries as the object has changed.
round.retries.Clear(obj)

err := round.processSingle(obj, rev, status.Delete)
err := round.processSingle(obj, change.Revision, change.Deleted)
if err != nil {
round.errs = append(round.errs, err)
}

round.numReconciled++
if round.numReconciled >= round.config.IncrementalRoundSize {
break
}
}
return newRevision
}

func (round *incrementalRound[Obj]) batch() statedb.Revision {
func (round *incrementalRound[Obj]) batch(changes statedb.ChangeIterator[Obj]) {
ops := round.config.BatchOperations
updateBatch := []BatchEntry[Obj]{}
deleteBatch := []BatchEntry[Obj]{}

// Iterate in revision order through new and changed objects.
newRevision := round.oldRevision
iter, _ := round.table.LowerBound(round.txn, statedb.ByRevision[Obj](round.oldRevision+1))
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
newRevision = rev
for change, rev, ok := changes.Next(); ok; change, rev, ok = changes.Next() {
obj := change.Object

status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindPending {
if !change.Deleted && status.Kind != StatusKindPending {
// Only process objects that are pending reconciliation, e.g.
// changed from outside.
// Failures (e.g. StatusKindError) are processed via the retry queue.
Expand All @@ -141,7 +121,7 @@ func (round *incrementalRound[Obj]) batch() statedb.Revision {
// Clone the object so we or the operations can mutate it.
obj = round.config.CloneObject(obj)

if status.Delete {
if change.Deleted {
deleteBatch = append(deleteBatch, BatchEntry[Obj]{Object: obj, Revision: rev})
} else {
updateBatch = append(updateBatch, BatchEntry[Obj]{Object: obj, Revision: rev})
Expand All @@ -163,13 +143,10 @@ func (round *incrementalRound[Obj]) batch() statedb.Revision {
time.Since(start),
)
for _, entry := range deleteBatch {
if entry.Result == nil {
// Reconciling succeeded, so clear the retries.
round.retries.Clear(entry.Object)
round.results[entry.Object] = opResult{rev: entry.Revision, delete: true}
} else {
if entry.Result != nil {
// Delete failed, queue a retry for it.
round.errs = append(round.errs, entry.Result)
round.results[entry.Object] = opResult{rev: entry.Revision, status: StatusError(true, entry.Result)}
round.retries.Add(entry.Object)
}
}
}
Expand All @@ -191,11 +168,10 @@ func (round *incrementalRound[Obj]) batch() statedb.Revision {
round.results[entry.Object] = opResult{rev: entry.Revision, status: StatusDone()}
} else {
round.errs = append(round.errs, entry.Result)
round.results[entry.Object] = opResult{rev: entry.Revision, status: StatusError(false, entry.Result)}
round.results[entry.Object] = opResult{rev: entry.Revision, status: StatusError(entry.Result)}
}
}
}
return newRevision
}

func (round *incrementalRound[Obj]) processRetries() {
Expand All @@ -207,20 +183,17 @@ func (round *incrementalRound[Obj]) processRetries() {
}
round.retries.Pop()

obj, rev, ok := round.table.First(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
if !ok {
// Object has been deleted unexpectedly (e.g. from outside
// the reconciler). Assume that it can be forgotten about.
round.retries.Clear(robj)
continue
}

status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindError {
continue
obj, rev, found := round.table.First(round.txn, round.primaryIndexer.QueryFromObject(robj.(Obj)))
if found {
status := round.config.GetObjectStatus(obj)
if status.Kind != StatusKindError {
continue
}
} else {
obj = robj.(Obj)
}

err := round.processSingle(obj, rev, status.Delete)
err := round.processSingle(obj, rev, !found)
if err != nil {
round.errs = append(round.errs, err)
}
Expand All @@ -232,29 +205,26 @@ func (round *incrementalRound[Obj]) processRetries() {
func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision, delete bool) error {
start := time.Now()

// Clone the object so it can be mutated by the operations and to be able to set the
// status.
obj = round.config.CloneObject(obj)

var (
err error
op string
)
if delete {
op = OpDelete
err = round.config.Operations.Delete(round.ctx, round.txn, obj)
if err == nil {
round.results[obj] = opResult{rev: rev, delete: true}
} else {
round.results[obj] = opResult{rev: rev, status: StatusError(true, err)}
if err != nil {
// Deletion failed. Retry again later.
round.retries.Add(obj)
}
} else {
// Clone the object so it can be mutated by Update()
obj = round.config.CloneObject(obj)
op = OpUpdate
err = round.config.Operations.Update(round.ctx, round.txn, obj, nil /* changed */)
if err == nil {
round.results[obj] = opResult{rev: rev, status: StatusDone()}
} else {
round.results[obj] = opResult{rev: rev, status: StatusError(false, err)}
round.results[obj] = opResult{rev: rev, status: StatusError(err)}
}
}
round.metrics.IncrementalReconciliationDuration(round.moduleID, op, time.Since(start))
Expand All @@ -268,29 +238,17 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision,

}

func (round *incrementalRound[Obj]) commitStatus() <-chan struct{} {
func (round *incrementalRound[Obj]) commitStatus() {
if len(round.results) == 0 {
// Nothing to commit.
_, watch := round.table.All(round.txn)
return watch
return
}

wtxn := round.db.WriteTxn(round.table)
defer wtxn.Commit()

// The revision up to which we just now reconciled.
revReconciled := round.table.Revision(round.txn)

// The revision before committing the status updates.
revBeforeWrite := round.table.Revision(wtxn)

// Commit status for updated objects.
for obj, result := range round.results {
if result.delete {
round.table.CompareAndDelete(wtxn, result.rev, obj)
continue
}

// Update the object if it is unchanged. It may happen that the object has
// been updated in the meanwhile, in which case we ignore the status as the
// update will be picked up by next reconciliation round.
Expand All @@ -302,15 +260,4 @@ func (round *incrementalRound[Obj]) commitStatus() <-chan struct{} {
round.retries.Add(obj)
}
}

watch := closedWatchChannel
if revReconciled == revBeforeWrite {
// No outside changes happened between the ReadTxn and this WriteTxn. Grab a new
// watch channel of the root to only watch for new changes after this write.
//
// If changes did happen, we'll return a closed watch channel and
// immediately reconcile again.
_, watch = round.table.All(wtxn)
}
return watch
}
Loading

0 comments on commit 90fe98b

Please sign in to comment.