diff --git a/daemon/manager/manager.go b/daemon/manager/manager.go index ec7ea28c0..0ceab1369 100644 --- a/daemon/manager/manager.go +++ b/daemon/manager/manager.go @@ -105,7 +105,7 @@ func Execute(configs *options.BootstrapConfigs) error { }() cniInstaller := cni.NewInstaller(configs.BpfConfig.Mode, - configs.CniConfig.CniMountNetEtcDIR, configs.CniConfig.CniConfigName, configs.CniConfig.CniConfigChained) + configs.CniConfig.CniMountNetEtcDIR, configs.CniConfig.CniConfigName, configs.CniConfig.CniConfigChained, configs.CniConfig.ServiceAccountPath) if err := cniInstaller.Start(); err != nil { return err } diff --git a/daemon/manager/uninstall/uninstall.go b/daemon/manager/uninstall/uninstall.go index dbbfc3f0e..276c03de2 100644 --- a/daemon/manager/uninstall/uninstall.go +++ b/daemon/manager/uninstall/uninstall.go @@ -38,7 +38,7 @@ func NewCmd() *cobra.Command { return err } cniInstaller := cni.NewInstaller(configs.BpfConfig.Mode, - configs.CniConfig.CniMountNetEtcDIR, configs.CniConfig.CniConfigName, configs.CniConfig.CniConfigChained) + configs.CniConfig.CniMountNetEtcDIR, configs.CniConfig.CniConfigName, configs.CniConfig.CniConfigChained, configs.CniConfig.ServiceAccountPath) cniInstaller.Stop() return nil }, diff --git a/daemon/options/cni.go b/daemon/options/cni.go index 31aef9409..0019fd625 100644 --- a/daemon/options/cni.go +++ b/daemon/options/cni.go @@ -23,9 +23,10 @@ import ( ) type cniConfig struct { - CniMountNetEtcDIR string - CniConfigName string - CniConfigChained bool + CniMountNetEtcDIR string + CniConfigName string + CniConfigChained bool + ServiceAccountPath string } func (c *cniConfig) AttachFlags(cmd *cobra.Command) { @@ -44,5 +45,7 @@ func (c *cniConfig) ParseConfig() error { return err } + c.ServiceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount" + return nil } diff --git a/go.mod b/go.mod index 02d44c2c9..334a73958 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/containernetworking/cni v1.2.3 github.com/containernetworking/plugins v1.5.1 github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 + github.com/fsnotify/fsnotify v1.7.0 github.com/golang/protobuf v1.5.4 github.com/miekg/dns v1.1.62 github.com/prometheus/client_golang v1.20.2 @@ -78,7 +79,6 @@ require ( github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/florianl/go-nflog/v2 v2.1.0 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-errors/errors v1.5.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/pkg/cni/chained.go b/pkg/cni/chained.go index 0ff7d2dc6..3f00cbe6f 100644 --- a/pkg/cni/chained.go +++ b/pkg/cni/chained.go @@ -199,7 +199,7 @@ func (i *Installer) chainedKmeshCniPlugin(mode string, cniMountNetEtcDIR string) // which may be watched by other CNIs, and so we don't want to trigger writes to this file // unless it's missing or the contents are not what we expect. kubeconfigFilepath := filepath.Join(cniMountNetEtcDIR, kmeshCniKubeConfig) - if err := maybeWriteKubeConfigFile(kubeconfigFilepath); err != nil { + if err := maybeWriteKubeConfigFile(i.ServiceAccountPath, kubeconfigFilepath); err != nil { return fmt.Errorf("write kubeconfig: %v", err) } diff --git a/pkg/cni/chained_test.go b/pkg/cni/chained_test.go index 9c8efeb45..9e1f314fa 100644 --- a/pkg/cni/chained_test.go +++ b/pkg/cni/chained_test.go @@ -204,7 +204,7 @@ func TestGetCniConfigPath(t *testing.T) { t.Run(tt.name, func(t *testing.T) { config := tt.utconfig tt.beforeFunc() - i := NewInstaller(constants.AdsMode, config.CniMountNetEtcDIR, config.CniConfigName, config.CniConfigChained) + i := NewInstaller(constants.AdsMode, config.CniMountNetEtcDIR, config.CniConfigName, config.CniConfigChained, "") _, err := i.getCniConfigPath() if (err != nil) != tt.wantErr { t.Errorf("getCniConfigPath() error = %v, wantErr %v", err, tt.wantErr) @@ -307,7 +307,7 @@ func TestInsertCNIConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.beforeFunc() - i := NewInstaller(constants.AdsMode, "", "", true) + i := NewInstaller(constants.AdsMode, "", "", true, "") _, err := i.insertCNIConfig(tt.utconfig, "workload") if (err != nil) != tt.wantErr { t.Errorf("insertCNIConfig() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/cni/install.go b/pkg/cni/install.go index bc4a71f60..101e67cec 100644 --- a/pkg/cni/install.go +++ b/pkg/cni/install.go @@ -20,6 +20,13 @@ package cni import ( + "fmt" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" + "istio.io/istio/pkg/filewatcher" + "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/logger" ) @@ -50,24 +57,70 @@ func (i *Installer) removeCniConfig() error { } type Installer struct { - Mode string - CniMountNetEtcDIR string - CniConfigName string - CniConfigChained bool + Mode string + CniMountNetEtcDIR string + CniConfigName string + CniConfigChained bool + ServiceAccountPath string + + Watcher filewatcher.FileWatcher } func NewInstaller(mode string, cniMountNetEtcDIR string, cniConfigName string, - cniConfigChained bool) *Installer { + cniConfigChained bool, + serviceAccountPath string) *Installer { return &Installer{ - Mode: mode, - CniMountNetEtcDIR: cniMountNetEtcDIR, - CniConfigName: cniConfigName, - CniConfigChained: cniConfigChained, + Mode: mode, + CniMountNetEtcDIR: cniMountNetEtcDIR, + CniConfigName: cniConfigName, + CniConfigChained: cniConfigChained, + ServiceAccountPath: serviceAccountPath, + Watcher: filewatcher.NewWatcher(), } } +func (i *Installer) WatchServiceAccountToken() error { + tokenPath := i.ServiceAccountPath + "/token" + if err := i.Watcher.Add(tokenPath); err != nil { + return fmt.Errorf("failed to add %s to file watcher: %v", tokenPath, err) + } + + // Start listening for events. + go func() { + log.Infof("start watching file %s", tokenPath) + + var timerC <-chan time.Time + for { + select { + case <-timerC: + timerC = nil + + if err := maybeWriteKubeConfigFile(i.ServiceAccountPath, filepath.Join(i.CniMountNetEtcDIR, kmeshCniKubeConfig)); err != nil { + log.Errorf("failed try to update Kmesh cni kubeconfig: %v", err) + } + + case event := <-i.Watcher.Events(tokenPath): + log.Infof("got event %s", event.String()) + + if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { + if timerC == nil { + timerC = time.After(100 * time.Millisecond) + } + } + case err := <-i.Watcher.Errors(tokenPath): + if err != nil { + log.Errorf("error from errors channel of file watcher: %v", err) + return + } + } + } + }() + + return nil +} + func (i *Installer) Start() error { if i.Mode == constants.AdsMode || i.Mode == constants.WorkloadMode { log.Info("start write CNI config") @@ -76,7 +129,12 @@ func (i *Installer) Start() error { i.Stop() return err } + + if err := i.WatchServiceAccountToken(); err != nil { + return err + } } + return nil } @@ -86,6 +144,9 @@ func (i *Installer) Stop() { if err := i.removeCniConfig(); err != nil { log.Errorf("remove CNI config failed: %v, please remove manually", err) } + if err := i.Watcher.Close(); err != nil { + log.Errorf("failed to close fsnotify watcher: %v", err) + } log.Info("remove CNI config done") } } diff --git a/pkg/cni/install_test.go b/pkg/cni/install_test.go new file mode 100644 index 000000000..89f190628 --- /dev/null +++ b/pkg/cni/install_test.go @@ -0,0 +1,86 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cni + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "istio.io/istio/pkg/test/util/retry" + + "kmesh.net/kmesh/pkg/constants" +) + +func TestWatchTokenFile(t *testing.T) { + serviceAccountPath := t.TempDir() + os.WriteFile(filepath.Join(serviceAccountPath, "token"), []byte("faketoken"), 0o644) + os.WriteFile(filepath.Join(serviceAccountPath, "ca.crt"), []byte("fakecacert"), 0o644) + + os.Setenv("KUBERNETES_SERVICE_HOST", "10.96.0.1") + os.Setenv("KUBERNETES_SERVICE_PORT", "443") + + expectedKC, err := createKubeConfig(serviceAccountPath) + if err != nil { + t.Fatalf("failed to create expected kubeconfig: %v", err) + } + + cniDir := t.TempDir() + + i := NewInstaller(constants.WorkloadMode, cniDir, "conflist-name", true, serviceAccountPath) + defer i.Watcher.Close() + + kubeconfigPath := filepath.Join(i.CniMountNetEtcDIR, kmeshCniKubeConfig) + if err := maybeWriteKubeConfigFile(serviceAccountPath, kubeconfigPath); err != nil { + t.Fatalf("failed to write kubeconfig file: %v", err) + } + + existingKC, err := os.ReadFile(kubeconfigPath) + if err != nil { + t.Fatalf("failed to read the content of existing kubeconfig path: %v", err) + } + + if expectedKC != string(existingKC) { + t.Fatalf("existing kubeconfig\n%s\n***is NOT equal to*** expected kubeconfig\n%s\n", existingKC, expectedKC) + } + + if err := i.WatchServiceAccountToken(); err != nil { + t.Fatalf("failed to watch service account token: %v", err) + } + + os.WriteFile(filepath.Join(serviceAccountPath, "token"), []byte("updatedfaketoken"), 0o644) + + retry.UntilSuccess(func() error { + expectedKC, err = createKubeConfig(serviceAccountPath) + if err != nil { + return fmt.Errorf("failed to create expected kubeconfig after token update: %v", err) + } + + existingKC, err = os.ReadFile(kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to read the content of existing kubeconfig path update token update: %v", err) + } + + if expectedKC != string(existingKC) { + return fmt.Errorf("existing kubeconfig\n%s\n***is NOT equal to*** expected kubeconfig\n%s\nafter token update", existingKC, expectedKC) + } + + return nil + }, retry.Timeout(3*time.Second)) +} diff --git a/pkg/cni/kubeconfig.go b/pkg/cni/kubeconfig.go index 531981495..c306d837a 100644 --- a/pkg/cni/kubeconfig.go +++ b/pkg/cni/kubeconfig.go @@ -43,9 +43,7 @@ import ( "kmesh.net/kmesh/pkg/utils" ) -const ServiceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount" - -func createKubeConfig() (string, error) { +func createKubeConfig(serviceAccountPath string) (string, error) { k8sServiceHost := os.Getenv("KUBERNETES_SERVICE_HOST") if len(k8sServiceHost) == 0 { return "", fmt.Errorf("KUBERNETES_SERVICE_HOST not set. Is this not running within a pod?") @@ -59,14 +57,14 @@ func createKubeConfig() (string, error) { Server: fmt.Sprintf("https://%s", net.JoinHostPort(k8sServiceHost, k8sServicePort)), } - caFile := ServiceAccountPath + "/ca.crt" + caFile := serviceAccountPath + "/ca.crt" caContents, err := os.ReadFile(caFile) if err != nil { return "", err } cluster.CertificateAuthorityData = caContents - token, err := os.ReadFile(ServiceAccountPath + "/token") + token, err := os.ReadFile(serviceAccountPath + "/token") if err != nil { return "", err } @@ -109,8 +107,8 @@ func createKubeConfig() (string, error) { } // maybeWriteKubeConfigFile will validate the existing kubeConfig file, and rewrite/replace it if required. -func maybeWriteKubeConfigFile(kubeconfigFilepath string) error { - kc, err := createKubeConfig() +func maybeWriteKubeConfigFile(serviceAccountPath, kubeconfigFilepath string) error { + kc, err := createKubeConfig(serviceAccountPath) if err != nil { return err }