Skip to content

Commit

Permalink
reconciler: Drop out-of-sync metric and changed parameter from Update
Browse files Browse the repository at this point in the history
The "changed *bool" passed to Update() was confusing and it was often
not easy or cheap to check whether the target had the expected value or
not.

Let's err on the side of simplicity and drop the "out-of-sync" metric
for the time being.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed May 9, 2024
1 parent 7cd6fd4 commit 89f2ff2
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 57 deletions.
5 changes: 1 addition & 4 deletions reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb.
}

// Update implements reconciler.Operations.
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject, changed *bool) error {
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error {
mt.numUpdates.Add(1)
if changed != nil {
*changed = true
}
return nil
}

Expand Down
18 changes: 2 additions & 16 deletions reconciler/example/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package main

import (
"bytes"
"context"
"errors"
"log/slog"
Expand Down Expand Up @@ -75,22 +74,9 @@ func (ops *MemoOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb
}

// Update a memo.
func (ops *MemoOps) Update(ctx context.Context, txn statedb.ReadTxn, memo *Memo, changed *bool) error {
func (ops *MemoOps) Update(ctx context.Context, txn statedb.ReadTxn, memo *Memo) error {
filename := path.Join(ops.directory, memo.Name)

// Read the old file to figure out if it had changed.
// The 'changed' boolean is used by full reconciliation to keep track of when the target
// has gone out-of-sync (e.g. there has been some outside influence to it).
old, err := os.ReadFile(filename)
if err == nil && bytes.Equal(old, []byte(memo.Content)) {

// Nothing to do.
return nil
}
if changed != nil {
*changed = true
}
err = os.WriteFile(filename, []byte(memo.Content), 0644)
err := os.WriteFile(filename, []byte(memo.Content), 0644)
ops.log.Info("Update", "filename", filename, "error", err)
return err
}
Expand Down
12 changes: 1 addition & 11 deletions reconciler/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ import (
// Update() is called for each object. Full reconciliation is used to recover from unexpected outside modifications.
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error {
var errs []error
outOfSync := false
ops := r.Config.Operations

// First perform pruning to make room in the target.
iter, _ := r.Config.Table.All(txn)
start := time.Now()
if err := ops.Prune(ctx, txn, iter); err != nil {
outOfSync = true
errs = append(errs, fmt.Errorf("pruning failed: %w", err))
}
r.metrics.FullReconciliationDuration(r.ModuleID, OpPrune, time.Since(start))
Expand All @@ -32,12 +30,10 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error
iter, _ = r.Config.Table.All(txn) // Grab a new iterator as Prune() may have consumed it.
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
start := time.Now()
var changed bool
obj = r.Config.CloneObject(obj)
err := ops.Update(ctx, txn, obj, &changed)
err := ops.Update(ctx, txn, obj)
r.metrics.FullReconciliationDuration(r.ModuleID, OpUpdate, time.Since(start))

outOfSync = outOfSync || changed
if err == nil {
updateResults[obj] = opResult{rev: rev, status: StatusDone()}
r.retries.Clear(obj)
Expand All @@ -47,12 +43,6 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) []error
}
}

// Increment the out-of-sync counter if full reconciliation catched any out-of-sync
// objects.
if outOfSync {
r.metrics.FullReconciliationOutOfSync(r.ModuleID)
}

// Commit the new desired object status. This is performed separately in order
// to not lock the table when performing long-running target operations.
// If the desired object has been updated in the meanwhile the status update is dropped.
Expand Down
2 changes: 1 addition & 1 deletion reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (round *incrementalRound[Obj]) processSingle(obj Obj, rev statedb.Revision,
// Clone the object so it can be mutated by Update()
obj = round.config.CloneObject(obj)
op = OpUpdate
err = round.config.Operations.Update(round.ctx, round.txn, obj, nil /* changed */)
err = round.config.Operations.Update(round.ctx, round.txn, obj)
if err == nil {
round.results[obj] = opResult{rev: rev, status: StatusDone()}
} else {
Expand Down
15 changes: 4 additions & 11 deletions reconciler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Metrics interface {
IncrementalReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
IncrementalReconciliationErrors(moduleID cell.FullModuleID, errs []error)

FullReconciliationOutOfSync(moduleID cell.FullModuleID)
FullReconciliationErrors(moduleID cell.FullModuleID, errs []error)
FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration)
}
Expand All @@ -31,21 +30,16 @@ type ExpVarMetrics struct {
IncrementalReconciliationTotalErrorsVar *expvar.Map
IncrementalReconciliationCurrentErrorsVar *expvar.Map

FullReconciliationCountVar *expvar.Map
FullReconciliationDurationVar *expvar.Map
FullReconciliationTotalErrorsVar *expvar.Map
FullReconciliationCurrentErrorsVar *expvar.Map
FullReconciliationOutOfSyncCountVar *expvar.Map
FullReconciliationCountVar *expvar.Map
FullReconciliationDurationVar *expvar.Map
FullReconciliationTotalErrorsVar *expvar.Map
FullReconciliationCurrentErrorsVar *expvar.Map
}

func (m *ExpVarMetrics) FullReconciliationDuration(moduleID cell.FullModuleID, operation string, duration time.Duration) {
m.FullReconciliationDurationVar.AddFloat(moduleID.String()+"/"+operation, duration.Seconds())
}

func (m *ExpVarMetrics) FullReconciliationOutOfSync(moduleID cell.FullModuleID) {
m.FullReconciliationOutOfSyncCountVar.Add(moduleID.String(), 1)
}

func (m *ExpVarMetrics) FullReconciliationErrors(moduleID cell.FullModuleID, errs []error) {
m.FullReconciliationCountVar.Add(moduleID.String(), 1)
m.FullReconciliationTotalErrorsVar.Add(moduleID.String(), int64(len(errs)))
Expand Down Expand Up @@ -94,6 +88,5 @@ func newExpVarMetrics(publish bool) *ExpVarMetrics {
FullReconciliationDurationVar: newMap("full_reconciliation_duration"),
FullReconciliationTotalErrorsVar: newMap("full_reconciliation_total_errors"),
FullReconciliationCurrentErrorsVar: newMap("full_reconciliation_current_errors"),
FullReconciliationOutOfSyncCountVar: newMap("full_reconciliation_out_of_sync_count"),
}
}
8 changes: 2 additions & 6 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func testReconciler(t *testing.T, batchOps bool) {
}

assert.Greater(t, getInt(expVarMetrics.FullReconciliationCountVar.Get("test")), int64(0), "FullReconciliationCount")
assert.Greater(t, getInt(expVarMetrics.FullReconciliationOutOfSyncCountVar.Get("test")), int64(0), "FullReconciliationOutOfSyncCount")
assert.Greater(t, getFloat(expVarMetrics.FullReconciliationDurationVar.Get("test/prune")), float64(0), "FullReconciliationDuration/prune")
assert.Greater(t, getFloat(expVarMetrics.FullReconciliationDurationVar.Get("test/update")), float64(0), "FullReconciliationDuration/update")
assert.Equal(t, getInt(expVarMetrics.FullReconciliationCurrentErrorsVar.Get("test")), int64(0), "FullReconciliationCurrentErrors")
Expand Down Expand Up @@ -421,7 +420,7 @@ func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch [
// UpdateBatch implements reconciler.BatchOperations.
func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) {
for i := range batch {
batch[i].Result = mt.Update(ctx, txn, batch[i].Object, nil)
batch[i].Result = mt.Update(ctx, txn, batch[i].Object)
}
}

Expand All @@ -444,10 +443,7 @@ func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, iter statedb.
}

// Update implements reconciler.Operations.
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject, changed *bool) error {
if changed != nil {
*changed = true
}
func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error {
mt.updates.incr(obj.id)
if mt.faulty.Load() || obj.faulty {
mt.history.add(opFail(opUpdate(obj.id)))
Expand Down
9 changes: 1 addition & 8 deletions reconciler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,9 @@ type Operations[Obj any] interface {
// reconciliation is performed when the desired state is updated. A full
// reconciliation is done periodically by calling 'Update' on all objects.
//
// If 'changed' is non-nil then the Update must compare the realized state
// with the desired state and set it to true if they differ, e.g. whether
// the operation resulted in a change to the realized state. This is used
// during full reconciliation to catch cases where the realized state has
// gone out of sync due to outside influence. This is tracked in the
// "full_out_of_sync_total" metric.
//
// The object handed to Update is a clone produced by Config.CloneObject
// and thus Update can mutate the object.
Update(ctx context.Context, txn statedb.ReadTxn, obj Obj, changed *bool) error
Update(ctx context.Context, txn statedb.ReadTxn, obj Obj) error

// Delete the object in the target. Same semantics as with Update.
// Deleting a non-existing object is not an error and returns nil.
Expand Down

0 comments on commit 89f2ff2

Please sign in to comment.