Skip to content

Commit

Permalink
Merge pull request #3035 from kubernetes-sigs/backport-018-3023
Browse files Browse the repository at this point in the history
[release-0.18] 🐛 Refactor certificate watcher to use polling, instead of fsnotify
  • Loading branch information
k8s-ci-robot authored Dec 2, 2024
2 parents 318aa09 + 082726a commit 12955b3
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 128 deletions.
3 changes: 1 addition & 2 deletions examples/scratch-env/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
6 changes: 2 additions & 4 deletions examples/scratch-env/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.22.0
require (
github.com/evanphx/json-patch v4.12.0+incompatible // Using v4 to match upstream
github.com/evanphx/json-patch/v5 v5.9.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-logr/logr v1.4.1
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
Expand Down Expand Up @@ -38,7 +38,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
Expand Down
166 changes: 63 additions & 103 deletions pkg/certwatcher/certwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,55 @@ limitations under the License.
package certwatcher

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"os"
"sync"
"time"

"github.com/fsnotify/fsnotify"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
)

var log = logf.RuntimeLog.WithName("certwatcher")

// CertWatcher watches certificate and key files for changes. When either file
// changes, it reads and parses both and calls an optional callback with the new
// certificate.
const defaultWatchInterval = 10 * time.Second

// CertWatcher watches certificate and key files for changes.
// It always returns the cached version,
// but periodically reads and parses certificate and key for changes
// and calls an optional callback with the new certificate.
type CertWatcher struct {
sync.RWMutex

currentCert *tls.Certificate
watcher *fsnotify.Watcher
interval time.Duration

certPath string
keyPath string

cachedKeyPEMBlock []byte

// callback is a function to be invoked when the certificate changes.
callback func(tls.Certificate)
}

// New returns a new CertWatcher watching the given certificate and key.
func New(certPath, keyPath string) (*CertWatcher, error) {
var err error

cw := &CertWatcher{
certPath: certPath,
keyPath: keyPath,
interval: defaultWatchInterval,
}

// Initial read of certificate and key.
if err := cw.ReadCertificate(); err != nil {
return nil, err
}

cw.watcher, err = fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return cw, cw.ReadCertificate()
}

return cw, nil
// WithWatchInterval sets the watch interval and returns the CertWatcher pointer
func (cw *CertWatcher) WithWatchInterval(interval time.Duration) *CertWatcher {
cw.interval = interval
return cw
}

// RegisterCallback registers a callback to be invoked when the certificate changes.
Expand All @@ -91,72 +88,71 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate,

// Start starts the watch on the certificate and key files.
func (cw *CertWatcher) Start(ctx context.Context) error {
files := sets.New(cw.certPath, cw.keyPath)

{
var watchErr error
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
for _, f := range files.UnsortedList() {
if err := cw.watcher.Add(f); err != nil {
watchErr = err
return false, nil //nolint:nilerr // We want to keep trying.
}
// We've added the watch, remove it from the set.
files.Delete(f)
}
return true, nil
}); err != nil {
return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr}))
}
}

go cw.Watch()
ticker := time.NewTicker(cw.interval)
defer ticker.Stop()

log.Info("Starting certificate watcher")

// Block until the context is done.
<-ctx.Done()

return cw.watcher.Close()
}

// Watch reads events from the watcher's channel and reacts to changes.
func (cw *CertWatcher) Watch() {
for {
select {
case event, ok := <-cw.watcher.Events:
// Channel is closed.
if !ok {
return
case <-ctx.Done():
return nil
case <-ticker.C:
if err := cw.ReadCertificate(); err != nil {
log.Error(err, "failed read certificate")
}
}
}
}

cw.handleEvent(event)
// Watch used to read events from the watcher's channel and reacts to changes,
// it has currently no function and it's left here for backward compatibility until a future release.
//
// Deprecated: fsnotify has been removed and Start() is now polling instead.
func (cw *CertWatcher) Watch() {
}

case err, ok := <-cw.watcher.Errors:
// Channel is closed.
if !ok {
return
}
// updateCachedCertificate checks if the new certificate differs from the cache,
// updates it and returns the result if it was updated or not
func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool {
cw.Lock()
defer cw.Unlock()

log.Error(err, "certificate watch error")
}
if cw.currentCert != nil &&
bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) &&
bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) {
log.V(7).Info("certificate already cached")
return false
}
cw.currentCert = cert
cw.cachedKeyPEMBlock = keyPEMBlock
return true
}

// ReadCertificate reads the certificate and key files from disk, parses them,
// and updates the current certificate on the watcher. If a callback is set, it
// and updates the current certificate on the watcher if updated. If a callback is set, it
// is invoked with the new certificate.
func (cw *CertWatcher) ReadCertificate() error {
metrics.ReadCertificateTotal.Inc()
cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath)
certPEMBlock, err := os.ReadFile(cw.certPath)
if err != nil {
metrics.ReadCertificateErrors.Inc()
return err
}
keyPEMBlock, err := os.ReadFile(cw.keyPath)
if err != nil {
metrics.ReadCertificateErrors.Inc()
return err
}

cw.Lock()
cw.currentCert = &cert
cw.Unlock()
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
metrics.ReadCertificateErrors.Inc()
return err
}

if !cw.updateCachedCertificate(&cert, keyPEMBlock) {
return nil
}

log.Info("Updated current TLS certificate")

Expand All @@ -170,39 +166,3 @@ func (cw *CertWatcher) ReadCertificate() error {
}
return nil
}

func (cw *CertWatcher) handleEvent(event fsnotify.Event) {
// Only care about events which may modify the contents of the file.
if !(isWrite(event) || isRemove(event) || isCreate(event) || isChmod(event)) {
return
}

log.V(1).Info("certificate event", "event", event)

// If the file was removed or renamed, re-add the watch to the previous name
if isRemove(event) || isChmod(event) {
if err := cw.watcher.Add(event.Name); err != nil {
log.Error(err, "error re-watching file")
}
}

if err := cw.ReadCertificate(); err != nil {
log.Error(err, "error re-reading certificate")
}
}

func isWrite(event fsnotify.Event) bool {
return event.Op.Has(fsnotify.Write)
}

func isCreate(event fsnotify.Event) bool {
return event.Op.Has(fsnotify.Create)
}

func isRemove(event fsnotify.Event) bool {
return event.Op.Has(fsnotify.Remove)
}

func isChmod(event fsnotify.Event) bool {
return event.Op.Has(fsnotify.Chmod)
}
1 change: 1 addition & 0 deletions pkg/certwatcher/certwatcher_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand Down
Loading

0 comments on commit 12955b3

Please sign in to comment.