Skip to content

Commit

Permalink
reconciler: Move Table from Params to Config
Browse files Browse the repository at this point in the history
With Hive you cannot currently have more than one private provider
for the same type, even when they're in different scopes.

Move the Table from Params to Config so that the table can be
provided manually and not via Hive for situations where the
table cannot be provided via Hive.

The exact setup where this problem occurred was:

  // module 1
  var Cell = cell.Module(...,
    cell.ProvidePrivate(NewRWTableFoo),
    cell.Provide(NewWriter),
  ),
  type Writer struct {
    table RWTable[Foo]
  }
  func NewWriter(t RWTable[Foo]) *Writer
  func (w *Writer) UnsafeRWTable() RWTable[Foo] { return w.table }

  // module 2
  var Cell = cell.Module(...,
    cell.ProvidePrivate((*m1.Writer).UnsafeRWTable),
    cell.Invoke(reconciler.Register[Foo]),
  )

The above lead to:

  level=fatal msg="Failed to apply cell"
  error="cannot provide function \"...UnsafeRWTable: this function introduces a cycle
  (long error about NewWriter depending on RWTable[Foo] provided by UnsafeRWTable)

Hive's (uber/dig's) cycle detection doesn't seem to care about the private providers,
and thinks the private provide of "module 2" can be reached by "NewWriter" in module 1.
So to fix this and add more flexibility, just make the table a config parameter.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Apr 25, 2024
1 parent 125d1ff commit 5108a7e
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 27 deletions.
6 changes: 4 additions & 2 deletions reconciler/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func main() {
"test",
"Test",

cell.Provide(func(db_ *statedb.DB) (statedb.RWTable[*testObject], error) {
cell.Invoke(func(db_ *statedb.DB) error {
db = db_
return testObjects, db.RegisterTable(testObjects)
return db.RegisterTable(testObjects)
}),
cell.Provide(
func() (*mockOps, reconciler.Operations[*testObject]) {
Expand All @@ -133,6 +133,8 @@ func main() {
),
cell.Provide(func() reconciler.Config[*testObject] {
return reconciler.Config[*testObject]{
Table: testObjects,

// Don't run the full reconciliation via timer, but rather explicitly so that the full
// reconciliation operations don't mix with incremental when not expected.
FullReconcilationInterval: time.Hour,
Expand Down
3 changes: 2 additions & 1 deletion reconciler/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ var Hive = hive.New(
),
)

func NewReconcilerConfig(ops reconciler.Operations[*Memo], m *reconciler.ExpVarMetrics) reconciler.Config[*Memo] {
func NewReconcilerConfig(ops reconciler.Operations[*Memo], tbl statedb.RWTable[*Memo], m *reconciler.ExpVarMetrics) reconciler.Config[*Memo] {
return reconciler.Config[*Memo]{
Table: tbl,
Metrics: m,
FullReconcilationInterval: 10 * time.Second,
RetryBackoffMinDuration: 100 * time.Millisecond,
Expand Down
14 changes: 7 additions & 7 deletions reconciler/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (

// full performs full reconciliation of all objects. First the Prune() operations is performed to clean up and then
// Update() is called for each object. Full reconciliation is used to recover from unexpected outside modifications.
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn, lastRev statedb.Revision) (statedb.Revision, error) {
func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn) (statedb.Revision, error) {
var errs []error
outOfSync := false
ops := r.Config.Operations

// First perform pruning to make room in the target.
iter, _ := r.Table.All(txn)
iter, _ := r.Config.Table.All(txn)
start := time.Now()
if err := ops.Prune(ctx, txn, iter); err != nil {
outOfSync = true
Expand All @@ -29,7 +29,7 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn, lastRev

// Call Update() for each desired object to validate that it is up-to-date.
updateResults := make(map[Obj]opResult)
iter, _ = r.Table.All(txn) // Grab a new iterator as Prune() may have consumed it.
iter, _ = r.Config.Table.All(txn) // Grab a new iterator as Prune() may have consumed it.
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
start := time.Now()
var changed bool
Expand Down Expand Up @@ -57,10 +57,10 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn, lastRev
// to not lock the table when performing long-running target operations.
// If the desired object has been updated in the meanwhile the status update is dropped.
if len(updateResults) > 0 {
wtxn := r.DB.WriteTxn(r.Table)
wtxn := r.DB.WriteTxn(r.Config.Table)
for obj, result := range updateResults {
obj = r.Config.SetObjectStatus(obj, result.status)
_, _, err := r.Table.CompareAndSwap(wtxn, result.rev, obj)
_, _, err := r.Config.Table.CompareAndSwap(wtxn, result.rev, obj)
if err == nil && result.status.Kind != StatusKindDone {
// Object had not changed in the meantime, queue the retry.
r.retries.Add(obj)
Expand All @@ -71,10 +71,10 @@ func (r *reconciler[Obj]) full(ctx context.Context, txn statedb.ReadTxn, lastRev

r.metrics.FullReconciliationErrors(r.ModuleID, errs)
if len(errs) > 0 {
return r.Table.Revision(txn), fmt.Errorf("full: %w", joinErrors(errs))
return r.Config.Table.Revision(txn), fmt.Errorf("full: %w", joinErrors(errs))
}

// Sync succeeded up to latest revision. Continue incremental reconciliation from
// this revision.
return r.Table.Revision(txn), nil
return r.Config.Table.Revision(txn), nil
}
2 changes: 1 addition & 1 deletion reconciler/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *reconciler[Obj]) incremental(ctx context.Context, txn statedb.ReadTxn,
oldRevision: rev,
ctx: ctx,
txn: txn,
table: r.Table,
table: r.Config.Table,
results: make(map[Obj]opResult),
}

Expand Down
28 changes: 14 additions & 14 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
// Register creates a new reconciler and registers to the application
// lifecycle. To be used with cell.Invoke when the API of the reconciler
// is not needed.
func Register[Obj comparable](p Params[Obj]) error {
_, err := New(p)
func Register[Obj comparable](cfg Config[Obj], params Params) error {
_, err := New(cfg, params)
return err
}

// New creates and registers a new reconciler.
func New[Obj comparable](p Params[Obj]) (Reconciler[Obj], error) {
if err := p.Config.validate(); err != nil {
func New[Obj comparable](cfg Config[Obj], p Params) (Reconciler[Obj], error) {
if err := cfg.validate(); err != nil {
return nil, err
}

metrics := p.Config.Metrics
metrics := cfg.Metrics
if metrics == nil {
if p.DefaultMetrics == nil {
metrics = NewUnpublishedExpVarMetrics()
Expand All @@ -39,14 +39,15 @@ func New[Obj comparable](p Params[Obj]) (Reconciler[Obj], error) {
}
}

idx := p.Table.PrimaryIndexer()
idx := cfg.Table.PrimaryIndexer()
objectToKey := func(o any) index.Key {
return idx.ObjectToKey(o.(Obj))
}
r := &reconciler[Obj]{
Params: p,
Config: cfg,
metrics: metrics,
retries: newRetries(p.Config.RetryBackoffMinDuration, p.Config.RetryBackoffMaxDuration, objectToKey),
retries: newRetries(cfg.RetryBackoffMinDuration, cfg.RetryBackoffMaxDuration, objectToKey),
externalFullTrigger: make(chan struct{}, 1),
primaryIndexer: idx,
}
Expand All @@ -59,22 +60,21 @@ func New[Obj comparable](p Params[Obj]) (Reconciler[Obj], error) {
return r, nil
}

type Params[Obj comparable] struct {
type Params struct {
cell.In

Config Config[Obj]
Lifecycle cell.Lifecycle
Log *slog.Logger
DB *statedb.DB
Table statedb.RWTable[Obj]
Jobs job.Registry
ModuleID cell.FullModuleID
Health cell.Health
DefaultMetrics Metrics `optional:"true"`
}

type reconciler[Obj comparable] struct {
Params[Obj]
Params
Config Config[Obj]
metrics Metrics
retries *retries
externalFullTrigger chan struct{}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (r *reconciler[Obj]) loop(ctx context.Context, health cell.Health) error {
errs = append(errs, err)
}

if fullReconciliation && r.Table.Initialized(txn) {
if fullReconciliation && r.Config.Table.Initialized(txn) {
// Time to perform a full reconciliation. An incremental reconciliation
// has been performed prior to this, so the assumption is that everything
// is up to date (provided incremental reconciliation did not fail). We
Expand All @@ -170,14 +170,14 @@ func (r *reconciler[Obj]) loop(ctx context.Context, health cell.Health) error {
fullReconciliation = false

var err error
revision, err = r.full(ctx, txn, revision)
revision, err = r.full(ctx, txn)
if err != nil {
errs = append(errs, err)
}
}

if len(errs) == 0 {
health.OK(fmt.Sprintf("OK, %d objects", r.Table.NumObjects(txn)))
health.OK(fmt.Sprintf("OK, %d objects", r.Config.Table.NumObjects(txn)))
} else {
health.Degraded("Reconciliation failed", errors.Join(errs...))
}
Expand Down
5 changes: 3 additions & 2 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func testReconciler(t *testing.T, batchOps bool) {
return expVarMetrics
}),

cell.Provide(func(db_ *statedb.DB) (statedb.RWTable[*testObject], error) {
cell.Invoke(func(db_ *statedb.DB) error {
db = db_
return testObjects, db.RegisterTable(testObjects)
return db.RegisterTable(testObjects)
}),
cell.Provide(func() reconciler.Config[*testObject] {
cfg := reconciler.Config[*testObject]{
Table: testObjects,
// Don't run the full reconciliation via timer, but rather explicitly so that the full
// reconciliation operations don't mix with incremental when not expected.
FullReconcilationInterval: time.Hour,
Expand Down
6 changes: 6 additions & 0 deletions reconciler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Reconciler[Obj any] interface {
}

type Config[Obj any] struct {
// Table to reconcile.
Table statedb.RWTable[Obj]

// Metrics to use with this reconciler. The metrics capture the duration
// of operations during incremental and full reconcilation and the errors
// that occur during either.
Expand Down Expand Up @@ -88,6 +91,9 @@ type Config[Obj any] struct {
}

func (cfg Config[Obj]) validate() error {
if cfg.Table == nil {
return fmt.Errorf("%T.Table cannot be nil", cfg)
}
if cfg.GetObjectStatus == nil {
return fmt.Errorf("%T.GetObjectStatus cannot be nil", cfg)
}
Expand Down

0 comments on commit 5108a7e

Please sign in to comment.