Skip to content

Commit

Permalink
Fix race with table init channel
Browse files Browse the repository at this point in the history
The function to mark a table initialized wrongly closed the
channel immediately. This could cause an observer to be woken
up too early (before Commit() has finished) and get an uninitialized
snapshot.

Fix the timing issue by moving the closing into Commit(), next to
the normal watch channel closing. This way the initialized table has
been committed before the init watch channel closes.

This issue was detected by `pkg/k8s/statedb_test.go` in cilium/cilium:

     statedb_test.go:263:
         	Error Trace:	/home/runner/work/cilium/cilium/pkg/k8s/statedb_test.go:263
         	            	/home/runner/work/cilium/cilium/pkg/k8s/statedb_test.go:120
         	Error:      	Not equal:
         	            	expected: "obj1"
         	            	actual  : "garbage"

         	            	Diff:
         	            	--- Expected
         	            	+++ Actual
         	            	@@ -1 +1 @@
         	            	-obj1
         	            	+garbage
         	Test:       	TestStateDBReflector/default

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Sep 6, 2024
1 parent 574b83f commit f96349e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
4 changes: 0 additions & 4 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ func (t *genTable[Obj]) RegisterInitializer(txn WriteTxn, name string) func(Writ
slices.Clone(table.pendingInitializers),
func(n string) bool { return n == name },
)
if !table.initialized && len(table.pendingInitializers) == 0 {
close(table.initWatchChan)
table.initialized = true
}
}
})
}
Expand Down
14 changes: 14 additions & 0 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,18 @@ func (txn *txn) Commit() ReadTxn {
root := *db.root.Load()
root = slices.Clone(root)

var initChansToClose []chan struct{}

// Insert the modified tables into the root tree of tables.
for pos, table := range txn.modifiedTables {
if table != nil {
// Check if tables become initialized. We close the channel only after
// we've swapped in the new root so that one cannot get a snapshot of
// an uninitialized table after observing the channel closing.
if !table.initialized && len(table.pendingInitializers) == 0 {
initChansToClose = append(initChansToClose, table.initWatchChan)
table.initialized = true
}
root[pos] = *table
}
}
Expand All @@ -480,6 +489,11 @@ func (txn *txn) Commit() ReadTxn {
txn.Notify()
}

// Notify table initializations
for _, ch := range initChansToClose {
close(ch)
}

txn.db.metrics.WriteTxnDuration(
txn.handle,
txn.tableNames,
Expand Down

0 comments on commit f96349e

Please sign in to comment.