diff --git a/.codespellrc b/.codespellrc index a0c1af15e..329143527 100644 --- a/.codespellrc +++ b/.codespellrc @@ -1,3 +1,3 @@ [codespell] skip = .git,go.sum,go.mod,*.png,*.svg -ignore-words-list = kmesh,outter,nd,Donot,donot,doesnot \ No newline at end of file +ignore-words-list = kmesh,outter,nd,Donot,donot,doesnot,Failer \ No newline at end of file diff --git a/bpf/kmesh/probes/metrics.h b/bpf/kmesh/probes/metrics.h index 3ad57ed54..fd8a7114d 100644 --- a/bpf/kmesh/probes/metrics.h +++ b/bpf/kmesh/probes/metrics.h @@ -9,6 +9,8 @@ struct metric_key { struct ip_addr src_ip; struct ip_addr dst_ip; + __u32 direction; + __u32 dst_port; }; struct metric_data { @@ -34,20 +36,34 @@ struct { __uint(max_entries, RINGBUF_SIZE); } map_of_metric_notify SEC(".maps"); -static inline void construct_metric_key(struct bpf_sock *sk, struct metric_key *key) +static inline void construct_metric_key(struct bpf_sock *sk, __u8 direction, struct metric_key *key) { bpf_memset(key, 0, sizeof(struct metric_key)); - if (sk->family == AF_INET) { - key->src_ip.ip4 = sk->src_ip4; - key->dst_ip.ip4 = sk->dst_ip4; + + key->direction = direction; + if (direction == OUTBOUND) { + if (sk->family == AF_INET) { + key->src_ip.ip4 = sk->src_ip4; + key->dst_ip.ip4 = sk->dst_ip4; + } else { + bpf_memcpy(key->src_ip.ip6, sk->src_ip6, IPV6_ADDR_LEN); + bpf_memcpy(key->dst_ip.ip6, sk->dst_ip6, IPV6_ADDR_LEN); + } + key->dst_port = bpf_ntohl(sk->dst_port); } else { - bpf_memcpy(key->src_ip.ip6, sk->src_ip6, IPV6_ADDR_LEN); - bpf_memcpy(key->dst_ip.ip6, sk->dst_ip6, IPV6_ADDR_LEN); + if (sk->family == AF_INET) { + key->src_ip.ip4 = sk->dst_ip4; + key->dst_ip.ip4 = sk->src_ip4; + } else { + bpf_memcpy(key->src_ip.ip6, sk->dst_ip6, IPV6_ADDR_LEN); + bpf_memcpy(key->dst_ip.ip6, sk->src_ip6, IPV6_ADDR_LEN); + } + key->dst_port = sk->src_port; } return; } -static inline void report_metrics(struct bpf_sock *sk) +static inline void report_metrics(struct metric_key *mk) { struct metric_key *key = bpf_ringbuf_reserve(&map_of_metric_notify, sizeof(struct metric_key), 0); if (!key) { @@ -55,7 +71,7 @@ static inline void report_metrics(struct bpf_sock *sk) return; } - construct_metric_key(sk, key); + bpf_memcpy(key, mk, sizeof(struct metric_key)); bpf_ringbuf_submit(key, 0); return; } @@ -67,7 +83,7 @@ metric_on_connect(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct soc struct metric_data data = {0}; struct metric_data *metric = NULL; - construct_metric_key(sk, &key); + construct_metric_key(sk, storage->direction, &key); metric = (struct metric_data *)bpf_map_lookup_elem(&map_of_metrics, &key); if (!metric) { data.conn_open++; @@ -83,7 +99,7 @@ metric_on_connect(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct soc metric->conn_open++; metric->direction = storage->direction; notify: - report_metrics(sk); + report_metrics(&key); return; } @@ -94,7 +110,7 @@ metric_on_close(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct sock_ struct metric_data data = {0}; struct metric_data *metric = NULL; - construct_metric_key(sk, &key); + construct_metric_key(sk, storage->direction, &key); metric = (struct metric_data *)bpf_map_lookup_elem(&map_of_metrics, &key); if (!metric) { // connect failed @@ -113,7 +129,7 @@ metric_on_close(struct bpf_sock *sk, struct bpf_tcp_sock *tcp_sock, struct sock_ metric->sent_bytes += tcp_sock->delivered; metric->received_bytes += tcp_sock->bytes_received; notify: - report_metrics(sk); + report_metrics(&key); return; } diff --git a/docs/kmesh_compile-zh.md b/docs/kmesh_compile-zh.md index 4fd3ed741..3f55a6189 100644 --- a/docs/kmesh_compile-zh.md +++ b/docs/kmesh_compile-zh.md @@ -19,7 +19,7 @@ Kmesh需要在拥有Kmesh内核增强特性的Linux环境中编译构建。当 注意:kmesh-build镜像需要和源码版本相匹配 ```bash - docker pull ghcr.io/kmesh-net/kmesh-build-x86:latest + docker pull ghcr.io/kmesh-net/kmesh-build:latest ``` ### 源码编译 diff --git a/docs/kmesh_compile.md b/docs/kmesh_compile.md index c22b6b079..488026adb 100644 --- a/docs/kmesh_compile.md +++ b/docs/kmesh_compile.md @@ -19,7 +19,7 @@ The Kmesh needs to be compiled and built in the Linux environment with the Kmesh Note: The `kmesh-build` image needs to match the version of the source code. ``` - docker pull ghcr.io/kmesh-net/kmesh-build-x86:latest + docker pull ghcr.io/kmesh-net/kmesh-build:latest ``` ### build from source diff --git a/docs/kmesh_deploy_and_develop_in_kind.md b/docs/kmesh_deploy_and_develop_in_kind.md index e0c65f353..da7b19431 100644 --- a/docs/kmesh_deploy_and_develop_in_kind.md +++ b/docs/kmesh_deploy_and_develop_in_kind.md @@ -79,7 +79,7 @@ You can follow the steps below to develop in kind: + Build your docker image locally: ```shell - docker build --build-arg arch=amd64 -f build/docker/kmesh.dockerfile -t $image_name . + docker build -f build/docker/kmesh.dockerfile -t $image_name . ``` You should specify the `image_name`. diff --git a/go.mod b/go.mod index a5af838d0..28b821d35 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/agiledragon/gomonkey/v2 v2.12.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.15.0 - github.com/containernetworking/cni v1.2.2 + github.com/containernetworking/cni v1.2.3 github.com/containernetworking/plugins v1.5.1 github.com/envoyproxy/go-control-plane v0.12.1-0.20240614044803-82e2a76dbddd github.com/golang/protobuf v1.5.4 diff --git a/go.sum b/go.sum index 098ac3825..f430ccd88 100644 --- a/go.sum +++ b/go.sum @@ -101,8 +101,8 @@ github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1Ig github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU= github.com/containerd/stargz-snapshotter/estargz v0.15.1/go.mod h1:gr2RNwukQ/S9Nv33Lt6UC7xEx58C+LHRdoqbEKjz1Kk= -github.com/containernetworking/cni v1.2.2 h1:9IbP6KJQQxVKo4hhnm8r50YcVKrJbJu3Dqw+Rbt1vYk= -github.com/containernetworking/cni v1.2.2/go.mod h1:DuLgF+aPd3DzcTQTtp/Nvl1Kim23oFKdm2okJzBQA5M= +github.com/containernetworking/cni v1.2.3 h1:hhOcjNVUQTnzdRJ6alC5XF+wd9mfGIUaj8FuJbEslXM= +github.com/containernetworking/cni v1.2.3/go.mod h1:DuLgF+aPd3DzcTQTtp/Nvl1Kim23oFKdm2okJzBQA5M= github.com/containernetworking/plugins v1.5.1 h1:T5ji+LPYjjgW0QM+KyrigZbLsZ8jaX+E5J/EcKOE4gQ= github.com/containernetworking/plugins v1.5.1/go.mod h1:MIQfgMayGuHYs0XdNudf31cLLAC+i242hNm6KuDGqCM= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= diff --git a/pkg/cni/plugin/plugin.go b/pkg/cni/plugin/plugin.go index c3115ae36..35c33f23d 100644 --- a/pkg/cni/plugin/plugin.go +++ b/pkg/cni/plugin/plugin.go @@ -20,8 +20,6 @@ import ( "context" "encoding/json" "fmt" - "strconv" - "strings" "github.com/cilium/ebpf" "github.com/containernetworking/cni/pkg/skel" @@ -30,14 +28,10 @@ import ( "github.com/containernetworking/cni/pkg/version" netns "github.com/containernetworking/plugins/pkg/ns" "github.com/vishvananda/netlink" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/logger" - "kmesh.net/kmesh/pkg/nets" "kmesh.net/kmesh/pkg/utils" ) @@ -86,100 +80,6 @@ func parseSkelArgs(args *skel.CmdArgs) (*cniConf, *k8sArgs, *cniv1.Result, error return &cniConf, &k8sCommonArgs, result, nil } -// checkKmesh checks whether we should enable kmesh for the given pod -func checkKmesh(client kubernetes.Interface, pod *v1.Pod) (bool, error) { - namespace, err := client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{}) - if err != nil { - return false, err - } - var enableSidecar bool - injectLabel := namespace.Labels["istio-injection"] - if injectLabel == "enabled" { - enableSidecar = true - } - // According to istio, it support per pod config. - injValue := pod.Annotations["sidecar.istio.io/inject"] - if v, ok := pod.Labels["sidecar.istio.io/inject"]; ok { - injValue = v - } - if inject, err := strconv.ParseBool(injValue); err == nil { - enableSidecar = inject - } - - // If sidecar inject enabled, kmesh do not take charge of it. - if enableSidecar { - return false, nil - } - - // Exclude istio managed gateway - if gateway, ok := pod.Labels["gateway.istio.io/managed"]; ok { - if strings.EqualFold(gateway, "istio.io-mesh-controller") { - return false, nil - } - } - - mode := namespace.Labels[constants.DataPlaneModeLabel] - if strings.EqualFold(mode, constants.DataPlaneModeKmesh) { - return true, nil - } - - return false, nil -} - -func disableKmeshControl(ns string) error { - if ns == "" { - return nil - } - - execFunc := func(netns.NetNS) error { - /* - * Attempt to connect to a special IP address. The - * connection triggers the cgroup/connect4/6 ebpf - * program and records the netns cookie information - * of the current connection. The cookie can be used - * to determine whether the netns is managed by Kmesh. - * ControlCommandIp4/6:930(0x3a2) is "cipher key" for cgroup/connect4/6 - * ebpf program disable kmesh control - */ - return nets.TriggerControlCommand(constants.OperDisableControl) - } - - if err := netns.WithNetNSPath(ns, execFunc); err != nil { - err = fmt.Errorf("enter ns path :%v, run execFunc failed: %v", ns, err) - return err - } - return nil -} - -func enableKmeshControl(ns string) error { - execFunc := func(netns.NetNS) error { - /* - * Attempt to connect to a special IP address. The - * connection triggers the cgroup/connect4/6 ebpf - * program and records the netns cookie information - * of the current connection. The cookie can be used - * to determine whether the netns is managed by Kmesh. - * ControlCommandIp4/6:929(0x3a1) is "cipher key" for cgroup/connect4/6 - * ebpf program. - */ - return nets.TriggerControlCommand(constants.OperEnableControl) - } - - if err := netns.WithNetNSPath(ns, execFunc); err != nil { - err = fmt.Errorf("enter ns path :%v, run execFunc failed: %v", ns, err) - return err - } - return nil -} - -const KmeshRedirection = "kmesh.net/redirection" - -var annotationPatch = []byte(fmt.Sprintf( - `{"metadata":{"annotations":{"%s":"%s"}}}`, - KmeshRedirection, - "enabled", -)) - func getPrevCniResult(conf *cniConf) (*cniv1.Result, error) { var err error if conf.RawPrevResult == nil { @@ -228,17 +128,6 @@ func enableXdpAuth(ifname string) error { return nil } -func patchKmeshAnnotation(client kubernetes.Interface, pod *v1.Pod) error { - _, err := client.CoreV1().Pods(pod.Namespace).Patch( - context.Background(), - pod.Name, - k8stypes.MergePatchType, - annotationPatch, - metav1.PatchOptions{}, - ) - return err -} - // if cmdadd failed, then we cannot return failed, do nothing and print pre result func CmdAdd(args *skel.CmdArgs) error { var err error @@ -271,22 +160,23 @@ func CmdAdd(args *skel.CmdArgs) error { return err } - enableKmesh, err := checkKmesh(client, pod) + namespace, err := client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{}) if err != nil { - log.Errorf("failed to check enable kmesh information: %v", err) - return err + return fmt.Errorf("failed to get namespace %s: %v", pod.Namespace, err) } + enableKmesh := utils.ShouldEnroll(pod, namespace) + if !enableKmesh { return types.PrintResult(preResult, cniConf.CNIVersion) } - if err := enableKmeshControl(args.Netns); err != nil { + if err := utils.HandleKmeshManage(args.Netns, true); err != nil { log.Errorf("failed to enable kmesh control, err is %v", err) return err } - if err := patchKmeshAnnotation(client, pod); err != nil { + if err := utils.PatchKmeshRedirectAnnotation(client, pod); err != nil { log.Errorf("failed to annotate kmesh redirection, err is %v", err) } @@ -314,7 +204,7 @@ func CmdCheck(args *skel.CmdArgs) (err error) { func CmdDelete(args *skel.CmdArgs) error { // clean - if err := disableKmeshControl(args.Netns); err != nil { + if err := utils.HandleKmeshManage(args.Netns, false); err != nil { log.Errorf("failed to disable Kmesh control, err: %v", err) } diff --git a/pkg/cni/plugin/plugin_test.go b/pkg/cni/plugin/plugin_test.go deleted file mode 100644 index c751c4bc6..000000000 --- a/pkg/cni/plugin/plugin_test.go +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright The Kmesh Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Author: bitcoffee - * Create: 2023-11-19 - */ - -package plugin - -import ( - "testing" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" -) - -func TestCheckKmesh(t *testing.T) { - type args struct { - client kubernetes.Interface - pod *corev1.Pod - } - tests := []struct { - name string - args args - want bool - wantErr bool - }{ - { - name: "test1: namespace with istio-injection=enabled, pod with sidecar inject annotation, should return false", - args: args{ - client: fake.NewSimpleClientset(&corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "utNs", - Labels: map[string]string{ - "istio-injection": "ebable", - }, - }, - }), - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "utPod", - Namespace: "utNs", - Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", - }, - }, - }, - }, - want: false, - wantErr: false, - }, { - name: "test2: namespace with dataplane-mode=Kmesh, pod without sidecar inject annotation, should return true", - args: args{ - client: fake.NewSimpleClientset(&corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "utNs", - Labels: map[string]string{ - "istio.io/dataplane-mode": "kmesh", - }, - }, - }), - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "utPod", - Namespace: "utNs", - }, - }, - }, - want: true, - wantErr: false, - }, { - name: "test: namespace not found, should return error", - args: args{ - client: fake.NewSimpleClientset(&corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "otherNs", - }, - }), - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "utPod", - Namespace: "utNs", - Labels: map[string]string{ - "istio.io/dataplane-mode": "Kmesh", - }, - }, - }, - }, - want: false, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := checkKmesh(tt.args.client, tt.args.pod) - if (err != nil) != tt.wantErr { - t.Errorf("checkKmesh() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("checkKmesh() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 84d737cf7..58ab10d1d 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -43,8 +43,6 @@ const ( // Oper code for control command OperEnableControl = 929 OperDisableControl = 930 - OperEnableBypass = 931 - OperDisableByPass = 932 // tail call index in tail call prog map TailCallConnect4Index = 0 diff --git a/pkg/controller/bypass/bypass_controller.go b/pkg/controller/bypass/bypass_controller.go index 8c654555f..186514dbc 100644 --- a/pkg/controller/bypass/bypass_controller.go +++ b/pkg/controller/bypass/bypass_controller.go @@ -17,23 +17,21 @@ package bypass import ( - "context" "fmt" "os" + "time" netns "github.com/containernetworking/plugins/pkg/ns" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "kmesh.net/kmesh/pkg/constants" ns "kmesh.net/kmesh/pkg/controller/netns" "kmesh.net/kmesh/pkg/logger" - "kmesh.net/kmesh/pkg/nets" "kmesh.net/kmesh/pkg/utils" + "kmesh.net/kmesh/pkg/utils/istio" ) var ( @@ -41,54 +39,46 @@ var ( ) const ( - SidecarAnnotation = "sidecar.istio.io/inject" + DefaultInformerSyncPeriod = 30 * time.Second + ByPassLabel = "kmesh.net/bypass" + ByPassValue = "enabled" ) -func StartByPassController(client kubernetes.Interface, stopChan <-chan struct{}) error { - nodeName := os.Getenv("NODE_NAME") +type Controller struct { + pod cache.SharedIndexInformer + informerFactory informers.SharedInformerFactory +} - informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0, +func NewByPassController(client kubernetes.Interface) *Controller { + nodeName := os.Getenv("NODE_NAME") + informerFactory := informers.NewSharedInformerFactoryWithOptions(client, DefaultInformerSyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeName) })) - informerFactory.Start(wait.NeverStop) - informerFactory.WaitForCacheSync(wait.NeverStop) - podInformer := informerFactory.Core().V1().Pods().Informer() - - if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, _ = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod, ok := obj.(*corev1.Pod) if !ok { log.Errorf("expected *corev1.Pod but got %T", obj) return } - if !shouldEnroll(pod) { + if !istio.PodHasSidecar(pod) { + log.Infof("pod %s/%s does not have sidecar injected, skip", pod.GetNamespace(), pod.GetName()) return } - log.Infof("%s/%s: enable bypass control", pod.GetNamespace(), pod.GetName()) - enableSidecar, _ := checkSidecar(client, pod) - enableKmesh := isKmeshManaged(pod) - if !enableSidecar && !enableKmesh { - log.Info("do not need process, pod is not managed by sidecar or kmesh") + if !shouldBypass(pod) { + // TODO: add delete iptables in case we missed skip bypass during kmesh restart return } + log.Debugf("%s/%s: enable bypass control", pod.GetNamespace(), pod.GetName()) nspath, _ := ns.GetPodNSpath(pod) - - if enableSidecar { - if err := addIptables(nspath); err != nil { - log.Errorf("failed to add iptables rules for %s: %v", nspath, err) - return - } - } - if enableKmesh { - if err := handleKmeshBypass(nspath, 1); err != nil { - log.Errorf("failed to enable bypass control") - return - } + if err := addIptables(nspath); err != nil { + log.Errorf("failed to add iptables rules for %s: %v", nspath, err) + return } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -104,75 +94,57 @@ func StartByPassController(client kubernetes.Interface, stopChan <-chan struct{} return } - if shouldEnroll(oldPod) && !shouldEnroll(newPod) { - log.Infof("%s/%s: disable bypass control", newPod.GetNamespace(), newPod.GetName()) - enableSidecar, _ := checkSidecar(client, newPod) - enableKmesh := isKmeshManaged(newPod) - - if enableSidecar { - nspath, _ := ns.GetPodNSpath(newPod) - if err := deleteIptables(nspath); err != nil { - log.Errorf("failed to add iptables rules for %s: %v", nspath, err) - return - } + if !istio.PodHasSidecar(newPod) { + log.Debugf("pod %s/%s does not have a sidecar", newPod.GetNamespace(), newPod.GetName()) + return + } + + if shouldBypass(oldPod) && !shouldBypass(newPod) { + log.Debugf("%s/%s: restore sidecar control", newPod.GetNamespace(), newPod.GetName()) + nspath, _ := ns.GetPodNSpath(newPod) + if err := deleteIptables(nspath); err != nil { + log.Errorf("failed to delete iptables rules for %s: %v", nspath, err) + return } - if enableKmesh { - nspath, _ := ns.GetPodNSpath(newPod) - if err := handleKmeshBypass(nspath, 0); err != nil { - log.Errorf("failed to disable bypass control") - return - } + } + if !shouldBypass(oldPod) && shouldBypass(newPod) { + log.Debugf("%s/%s: enable bypass control", newPod.GetNamespace(), newPod.GetName()) + nspath, _ := ns.GetPodNSpath(newPod) + if err := addIptables(nspath); err != nil { + log.Errorf("failed to add iptables rules for %s: %v", nspath, err) + return } } }, - // We do not need to process delete here, because in bpf mode, it will be handled by kmesh-cni. - // In istio sidecar mode, we do not need to delete the iptables. - }); err != nil { - return fmt.Errorf("error adding event handler to podInformer: %v", err) - } + // We do not need to process delete here, because + // in istio sidecar mode, we do not need to delete the iptables. + }) - go podInformer.Run(stopChan) - - return nil -} + c := &Controller{ + informerFactory: informerFactory, + pod: podInformer, + } -func shouldEnroll(pod *corev1.Pod) bool { - enabled := pod.Labels["kmesh.net/bypass"] - return enabled == "enabled" + return c } -func handleKmeshBypass(ns string, oper int) error { - execFunc := func(netns.NetNS) error { - /* - * This function is used to process pods that are marked - * or deleted with the bypass label on the current node. - * Attempt to connect to a special IP address. The - * connection triggers the cgroup/connect4/6 ebpf - * program and records the netns cookie information - * of the current connection. The cookie can be used - * to determine whether the pod is been bypass. - * ControlCommandIp4/6: is "cipher key" for cgroup/connect4/6 - * ebpf program. 931/932 is the specific port handled by - * daemon to enable/disable bypass - */ - port := constants.OperEnableBypass - if oper == 0 { - port = constants.OperDisableByPass - } - return nets.TriggerControlCommand(port) +func (c *Controller) Run(stop <-chan struct{}) { + c.informerFactory.Start(stop) + if !cache.WaitForCacheSync(stop, c.pod.HasSynced) { + log.Error("failed to wait pod cache sync") } +} - if err := netns.WithNetNSPath(ns, execFunc); err != nil { - err = fmt.Errorf("enter ns path :%v, run execFunc failed: %v", ns, err) - return err - } - return nil +// checks whether there is a bypass label +func shouldBypass(pod *corev1.Pod) bool { + return pod.Labels[ByPassLabel] == ByPassValue } func isPodBeingDeleted(pod *corev1.Pod) bool { return pod.ObjectMeta.DeletionTimestamp != nil } +// TODO: make it a idempotent operation func addIptables(ns string) error { iptArgs := [][]string{ {"-t", "nat", "-I", "PREROUTING", "1", "-j", "RETURN"}, @@ -194,6 +166,7 @@ func addIptables(ns string) error { return nil } +// TODO: make it a idempotent operation func deleteIptables(ns string) error { iptArgs := [][]string{ {"-t", "nat", "-D", "PREROUTING", "-j", "RETURN"}, @@ -217,30 +190,3 @@ func deleteIptables(ns string) error { } return nil } - -func checkSidecar(client kubernetes.Interface, pod *corev1.Pod) (bool, error) { - namespace, err := client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{}) - if err != nil { - return false, err - } - - if value, ok := namespace.Labels["istio-injection"]; ok && value == "enabled" { - return true, nil - } - - if _, ok := pod.Annotations[SidecarAnnotation]; ok { - return true, nil - } - - return false, nil -} - -func isKmeshManaged(pod *corev1.Pod) bool { - annotations := pod.Annotations - if annotations != nil { - if value, ok := annotations[constants.KmeshRedirectionAnnotation]; ok && value == "enabled" { - return true - } - } - return false -} diff --git a/pkg/controller/bypass/bypass_test.go b/pkg/controller/bypass/bypass_test.go index cfd2eddcf..ad178ff73 100644 --- a/pkg/controller/bypass/bypass_test.go +++ b/pkg/controller/bypass/bypass_test.go @@ -20,88 +20,18 @@ import ( "context" "os" "sync" + "sync/atomic" "testing" "github.com/agiledragon/gomonkey/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "istio.io/api/annotation" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" ) -func TestPodKmeshLabelChangeTriggersByPassKmeshAction(t *testing.T) { - client := fake.NewSimpleClientset() - - err := os.Setenv("NODE_NAME", "test_node") - require.NoError(t, err) - t.Cleanup(func() { - os.Unsetenv("NODE_NAME") - }) - stopCh := make(chan struct{}) - err = StartByPassController(client, stopCh) - if err != nil { - t.Fatalf("error creating ByPassController: %v", err) - } - - var mu sync.Mutex - enabled := false - disabled := false - - var wg sync.WaitGroup - - patches := gomonkey.NewPatches() - defer patches.Reset() - - patches.ApplyFunc(handleKmeshBypass, func(ns string, op int) error { - mu.Lock() - defer mu.Unlock() - if op == 1 { - enabled = true - } else { - disabled = true - } - // Signal that handleKmeshBypass has been called - wg.Done() - return nil - }) - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{ - "kmesh.net/bypass": "enabled", - }, - Annotations: map[string]string{ - "kmesh.net/redirection": "enabled", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - - wg.Add(1) - _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) - assert.NoError(t, err) - - wg.Wait() - assert.Equal(t, true, enabled, "unexpected value for enabled flag") - assert.Equal(t, false, disabled, "unexpected value for disabled flag") - - enabled = false - disabled = false - - delete(pod.Labels, "kmesh.net/bypass") - wg.Add(1) - _, err = client.CoreV1().Pods("default").Update(context.TODO(), pod, metav1.UpdateOptions{}) - assert.NoError(t, err) - - wg.Wait() - assert.Equal(t, true, disabled, "unexpected value for enabled flag") -} - func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { client := fake.NewSimpleClientset() @@ -111,14 +41,10 @@ func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { os.Unsetenv("NODE_NAME") }) stopCh := make(<-chan struct{}) - err = StartByPassController(client, stopCh) - if err != nil { - t.Fatalf("error creating ByPassController: %v", err) - } - - var mu sync.Mutex - enabled := false - disabled := false + c := NewByPassController(client) + go c.Run(stopCh) + enabled := atomic.Bool{} + disabled := atomic.Bool{} var wg sync.WaitGroup @@ -126,9 +52,7 @@ func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { defer patches1.Reset() patches1.ApplyFunc(addIptables, func(ns string) error { - mu.Lock() - defer mu.Unlock() - enabled = true + enabled.Store(true) // Signal that addIptables has been called wg.Done() return nil @@ -138,9 +62,7 @@ func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { defer patches2.Reset() patches2.ApplyFunc(deleteIptables, func(ns string) error { - mu.Lock() - defer mu.Unlock() - disabled = true + disabled.Store(true) // Signal that addIptables has been called wg.Done() return nil @@ -158,12 +80,33 @@ func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { _, err = client.CoreV1().Namespaces().Create(context.TODO(), namespace, metav1.CreateOptions{}) require.NoError(t, err) - pod := &corev1.Pod{ + podWithBypassButNoSidecar := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-no-sidecar", + Namespace: namespaceName, + Labels: map[string]string{ + ByPassLabel: ByPassValue, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + + _, err = client.CoreV1().Pods(namespaceName).Create(context.TODO(), podWithBypassButNoSidecar, metav1.CreateOptions{}) + assert.NoError(t, err) + assert.Equal(t, false, enabled.Load(), "unexpected value for enabled flag") + assert.Equal(t, false, disabled.Load(), "unexpected value for disabled flag") + + podWithBypass := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", Namespace: namespaceName, Labels: map[string]string{ - "kmesh.net/bypass": "enabled", + ByPassLabel: ByPassValue, + }, + Annotations: map[string]string{ + annotation.SidecarStatus.Name: "placeholder", }, }, Spec: corev1.PodSpec{ @@ -172,21 +115,34 @@ func TestPodSidecarLabelChangeTriggersAddIptablesAction(t *testing.T) { } wg.Add(1) - _, err = client.CoreV1().Pods(namespaceName).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err = client.CoreV1().Pods(namespaceName).Create(context.TODO(), podWithBypass, metav1.CreateOptions{}) assert.NoError(t, err) - wg.Wait() - assert.Equal(t, true, enabled, "unexpected value for enabled flag") + assert.Equal(t, true, enabled.Load(), "unexpected value for enabled flag") + assert.Equal(t, false, disabled.Load(), "unexpected value for disabled flag") - enabled = false - disabled = false + enabled.Store(false) + disabled.Store(false) - newPod := pod.DeepCopy() - delete(newPod.Labels, "kmesh.net/bypass") + // Update pod by removing the bypass label + newPod := podWithBypass.DeepCopy() + delete(newPod.Labels, ByPassLabel) + wg.Add(1) + _, err = client.CoreV1().Pods(namespaceName).Update(context.TODO(), newPod, metav1.UpdateOptions{}) + assert.NoError(t, err) + wg.Wait() + assert.Equal(t, false, enabled.Load(), "unexpected value for enabled flag") + assert.Equal(t, true, disabled.Load(), "unexpected value for disabled flag") + + enabled.Store(false) + disabled.Store(false) + // Update pod by adding the bypass label + newPod = podWithBypass.DeepCopy() + newPod.Labels[ByPassLabel] = ByPassValue wg.Add(1) _, err = client.CoreV1().Pods(namespaceName).Update(context.TODO(), newPod, metav1.UpdateOptions{}) assert.NoError(t, err) - wg.Wait() - assert.Equal(t, true, disabled, "unexpected value for enabled flag") + assert.Equal(t, true, enabled.Load(), "unexpected value for enabled flag") + assert.Equal(t, false, disabled.Load(), "unexpected value for disabled flag") } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f2eaa410e..b4d9768b5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -76,16 +76,12 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("failed to start kmesh manage controller: %v", err) } - kmeshManageController.Run(stopCh) - + go kmeshManageController.Run(stopCh) log.Info("start kmesh manage controller successfully") if c.enableByPass { - err = bypass.StartByPassController(clientset, stopCh) - if err != nil { - return fmt.Errorf("failed to start bypass controller: %v", err) - } - + c := bypass.NewByPassController(clientset) + go c.Run(stopCh) log.Info("start bypass controller successfully") } diff --git a/pkg/controller/manage/kmesh_manage_test.go b/pkg/controller/manage/kmesh_manage_test.go deleted file mode 100644 index 630b4fab1..000000000 --- a/pkg/controller/manage/kmesh_manage_test.go +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Copyright The Kmesh Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kmeshmanage - -import ( - "context" - "os" - "reflect" - "sync" - "testing" - - "github.com/agiledragon/gomonkey/v2" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" - - "kmesh.net/kmesh/pkg/constants" - kmeshsecurity "kmesh.net/kmesh/pkg/controller/security" -) - -func waitAndCheckManageAction(t *testing.T, wg *sync.WaitGroup, mu *sync.Mutex, enabled *bool, disabled *bool, enableExpected bool, disableExpected bool) { - wg.Wait() - mu.Lock() - defer mu.Unlock() - assert.Equal(t, enableExpected, *enabled, "unexpected value for enabled flag") - assert.Equal(t, disableExpected, *disabled, "unexpected value for disabled flag") -} - -func TestPodWithLabelChangeTriggersManageAction(t *testing.T) { - client := fake.NewSimpleClientset() - - err := os.Setenv("NODE_NAME", "test_node") - require.NoError(t, err) - t.Cleanup(func() { - os.Unsetenv("NODE_NAME") - }) - controller, err := NewKmeshManageController(client, nil) - if err != nil { - t.Fatalf("error creating KmeshManageController: %v", err) - } - - stopChan := make(chan struct{}) - defer close(stopChan) - - controller.Run(stopChan) - cache.WaitForCacheSync(stopChan, controller.podInformer.HasSynced) - - var mu sync.Mutex - enabled := false - disabled := false - - // Create a WaitGroup to synchronize the test - var wg sync.WaitGroup - - patches := gomonkey.NewPatches() - defer patches.Reset() - - patches.ApplyFunc(handleKmeshManage, func(ns string, op bool) error { - mu.Lock() - defer mu.Unlock() - if op { - enabled = true - } else { - disabled = true - } - // Signal that handleKmeshManage has been called - wg.Done() - return nil - }) - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh}, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - - wg.Add(1) - _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) - assert.NoError(t, err) - - waitAndCheckManageAction(t, &wg, &mu, &enabled, &disabled, true, false) - - enabled = false - disabled = false - - delete(pod.Labels, constants.DataPlaneModeLabel) - wg.Add(1) - _, err = client.CoreV1().Pods("default").Update(context.TODO(), pod, metav1.UpdateOptions{}) - assert.NoError(t, err) - - waitAndCheckManageAction(t, &wg, &mu, &enabled, &disabled, false, true) -} - -func TestPodWithoutLabelTriggersManageAction(t *testing.T) { - client := fake.NewSimpleClientset() - - err := os.Setenv("NODE_NAME", "test_node") - require.NoError(t, err) - t.Cleanup(func() { - os.Unsetenv("NODE_NAME") - }) - controller, err := NewKmeshManageController(client, nil) - if err != nil { - t.Fatalf("error creating KmeshManageController: %v", err) - } - - stopChan := make(chan struct{}) - defer close(stopChan) - - controller.Run(stopChan) - cache.WaitForCacheSync(stopChan, controller.podInformer.HasSynced) - - var mu sync.Mutex - enabled := false - disabled := false - - // Create a WaitGroup to synchronize the test - var wg sync.WaitGroup - - patches := gomonkey.NewPatches() - defer patches.Reset() - - patches.ApplyFunc(handleKmeshManage, func(ns string, op bool) error { - mu.Lock() - defer mu.Unlock() - if op { - enabled = true - } else { - disabled = true - } - // Signal that handleKmeshManage has been called - wg.Done() - return nil - }) - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{}, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - - _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) - assert.NoError(t, err) - - enabled = false - disabled = false - - pod.Labels[constants.DataPlaneModeLabel] = constants.DataPlaneModeKmesh - wg.Add(1) - _, err = client.CoreV1().Pods("default").Update(context.TODO(), pod, metav1.UpdateOptions{}) - assert.NoError(t, err) - - waitAndCheckManageAction(t, &wg, &mu, &enabled, &disabled, true, false) -} - -func TestPodDeleteTriggersManageAction(t *testing.T) { - isSend := false - // Create a WaitGroup to synchronize the test - var wg sync.WaitGroup - client := fake.NewSimpleClientset() - - err := os.Setenv("NODE_NAME", "test_node") - require.NoError(t, err) - t.Cleanup(func() { - os.Unsetenv("NODE_NAME") - }) - - SecretManager := &kmeshsecurity.SecretManager{} - patches1 := gomonkey.NewPatches() - patches1.ApplyMethod(reflect.TypeOf(SecretManager), "SendCertRequest", func(_ *kmeshsecurity.SecretManager, identity string, op int) { - isSend = true - wg.Done() - }) - defer patches1.Reset() - - controller, err := NewKmeshManageController(client, SecretManager) - if err != nil { - t.Fatalf("error creating KmeshManageController: %v", err) - } - - stopChan := make(chan struct{}) - defer close(stopChan) - - controller.Run(stopChan) - cache.WaitForCacheSync(stopChan, controller.podInformer.HasSynced) - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh}, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - wg.Add(1) - _, err = client.CoreV1().Pods("default").Create(context.TODO(), pod, metav1.CreateOptions{}) - assert.NoError(t, err) - err = client.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) - assert.NoError(t, err) - wg.Wait() - assert.Equal(t, true, isSend, "unexpected value for enabled flag") -} - -func Test_shouldEnroll(t *testing.T) { - type args struct { - namespace *corev1.Namespace - client kubernetes.Interface - pod *corev1.Pod - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "pod managed by Kmesh", - args: args{ - client: fake.NewSimpleClientset(), - namespace: &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "ut-test", - }, - }, - pod: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ut-test", - Name: "ut-pod", - Labels: map[string]string{ - constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, - }, - }, - }, - }, - want: true, - }, - { - name: "pod not managed by Kmesh", - args: args{ - client: fake.NewSimpleClientset(), - namespace: &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "ut-test", - }, - }, - pod: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ut-test", - Name: "ut-pod", - }, - }, - }, - want: false, - }, - { - name: "pod in namespace should managed by Kmesh", - args: args{ - client: fake.NewSimpleClientset(), - namespace: &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "ut-test", - Labels: map[string]string{ - constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, - }, - }, - }, - pod: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ut-test", - Name: "ut-pod", - }, - }, - }, - want: true, - }, - { - name: "pod in namespace should not managed by Kmesh", - args: args{ - client: fake.NewSimpleClientset(), - namespace: &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "ut-test", - }, - }, - pod: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ut-test", - Name: "ut-pod", - }, - }, - }, - want: false, - }, - { - name: "waypoint should not managed by Kmesh", - args: args{ - client: fake.NewSimpleClientset(), - namespace: &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "ut-test", - Annotations: map[string]string{ - constants.KmeshRedirectionAnnotation: "enable", - }, - }, - }, - pod: &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ut-test", - Name: "ut-waypoint", - }, - }, - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, nsErr := tt.args.client.CoreV1().Namespaces().Create(context.TODO(), tt.args.namespace, metav1.CreateOptions{}) - assert.NoError(t, nsErr, "create test namespace failed") - _, podErr := tt.args.client.CoreV1().Pods("ut-test").Create(context.TODO(), tt.args.pod, metav1.CreateOptions{}) - assert.NoError(t, podErr, "create test pod failed") - - if got := shouldEnroll(tt.args.client, tt.args.pod); got != tt.want { - t.Errorf("shouldEnroll() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/controller/manage/kmesh_manage.go b/pkg/controller/manage/manage_controller.go similarity index 50% rename from pkg/controller/manage/kmesh_manage.go rename to pkg/controller/manage/manage_controller.go index 18d44fd6b..6b73a9519 100644 --- a/pkg/controller/manage/kmesh_manage.go +++ b/pkg/controller/manage/manage_controller.go @@ -17,18 +17,16 @@ package kmeshmanage import ( - "context" "fmt" "os" - "strings" - netns "github.com/containernetworking/plugins/pkg/ns" "istio.io/istio/pkg/spiffe" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -36,21 +34,10 @@ import ( ns "kmesh.net/kmesh/pkg/controller/netns" kmeshsecurity "kmesh.net/kmesh/pkg/controller/security" "kmesh.net/kmesh/pkg/logger" - "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils" ) -var ( - log = logger.NewLoggerField("manage_controller") - annotationDelPatch = []byte(fmt.Sprintf( - `{"metadata":{"annotations":{"%s":null}}}`, - constants.KmeshRedirectionAnnotation, - )) - annotationAddPatch = []byte(fmt.Sprintf( - `{"metadata":{"annotations":{"%s":"%s"}}}`, - constants.KmeshRedirectionAnnotation, - "enabled", - )) -) +var log = logger.NewLoggerField("manage_controller") const ( MaxRetries = 5 @@ -59,8 +46,21 @@ const ( ) type QueueItem struct { - pod *corev1.Pod - action string + podName string + podNs string + action string +} + +type KmeshManageController struct { + // TODO: share pod informer with bypass? + informerFactory informers.SharedInformerFactory + factory informers.SharedInformerFactory + podInformer cache.SharedIndexInformer + podLister v1.PodLister + namespaceInformer cache.SharedIndexInformer + namespaceLister v1.NamespaceLister + queue workqueue.RateLimitingInterface + client kubernetes.Interface } func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecurity.SecretManager) (*KmeshManageController, error) { @@ -70,8 +70,13 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeName) })) - podInformer := informerFactory.Core().V1().Pods().Informer() + podLister := informerFactory.Core().V1().Pods().Lister() + + factory := informers.NewSharedInformerFactory(client, 0) + namespaceInformer := factory.Core().V1().Namespaces().Informer() + namespaceLister := factory.Core().V1().Namespaces().Lister() + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -81,19 +86,24 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri log.Errorf("expected *corev1.Pod but got %T", obj) return } - if !shouldEnroll(client, pod) { + namespace, err := namespaceLister.Get(pod.Namespace) + if err != nil { + log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err) return } - log.Infof("%s/%s: enable Kmesh manage", pod.GetNamespace(), pod.GetName()) + if !utils.ShouldEnroll(pod, namespace) { + // TODO: check if pod has redirection annotation, then handleKmeshManage(nspath, false) + return + } + log.Infof("%s/%s: enable Kmesh manage", pod.GetNamespace(), pod.GetName()) nspath, _ := ns.GetPodNSpath(pod) - - if err := handleKmeshManage(nspath, true); err != nil { + if err := utils.HandleKmeshManage(nspath, true); err != nil { log.Errorf("failed to enable Kmesh manage") return } - queue.AddRateLimited(QueueItem{pod: pod, action: ActionAddAnnotation}) + queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation}) sendCertRequest(security, pod, kmeshsecurity.ADD) }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -104,31 +114,32 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri return } - //add Kmesh manage label for enable Kmesh control - if !shouldEnroll(client, oldPod) && shouldEnroll(client, newPod) { + namespace, err := namespaceLister.Get(newPod.Namespace) + if err != nil { + log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err) + return + } + // enable kmesh manage + if oldPod.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" && utils.ShouldEnroll(newPod, namespace) { log.Infof("%s/%s: enable Kmesh manage", newPod.GetNamespace(), newPod.GetName()) - nspath, _ := ns.GetPodNSpath(newPod) - - if err := handleKmeshManage(nspath, true); err != nil { + if err := utils.HandleKmeshManage(nspath, true); err != nil { log.Errorf("failed to enable Kmesh manage") return } - queue.AddRateLimited(QueueItem{pod: newPod, action: ActionAddAnnotation}) + queue.AddRateLimited(QueueItem{podName: newPod.Name, podNs: newPod.Namespace, action: ActionAddAnnotation}) sendCertRequest(security, newPod, kmeshsecurity.ADD) } - //delete Kmesh manage label for disable Kmesh control - if shouldEnroll(client, oldPod) && !shouldEnroll(client, newPod) { + // disable kmesh manage + if oldPod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" && !utils.ShouldEnroll(newPod, namespace) { log.Infof("%s/%s: disable Kmesh manage", newPod.GetNamespace(), newPod.GetName()) - nspath, _ := ns.GetPodNSpath(newPod) - if err := handleKmeshManage(nspath, false); err != nil { + if err := utils.HandleKmeshManage(nspath, false); err != nil { log.Errorf("failed to disable Kmesh manage") return } - - queue.AddRateLimited(QueueItem{pod: newPod, action: ActionDeleteAnnotation}) + queue.AddRateLimited(QueueItem{podName: newPod.Name, podNs: newPod.Namespace, action: ActionDeleteAnnotation}) sendCertRequest(security, oldPod, kmeshsecurity.DELETE) } }, @@ -146,9 +157,11 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri return } } - if shouldEnroll(client, pod) { - log.Infof("%s/%s: Pod managed by Kmesh is being deleted", pod.GetNamespace(), pod.GetName()) + if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" { + log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName()) sendCertRequest(security, pod, kmeshsecurity.DELETE) + // We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns. + // And we have done this in kmesh-cni } }, }); err != nil { @@ -156,134 +169,74 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri } return &KmeshManageController{ - informerFactory: informerFactory, - podInformer: podInformer, - queue: queue, - client: client, + informerFactory: informerFactory, + podInformer: podInformer, + podLister: podLister, + factory: factory, + namespaceInformer: namespaceInformer, + namespaceLister: namespaceLister, + queue: queue, + client: client, }, nil } -type KmeshManageController struct { - informerFactory informers.SharedInformerFactory - podInformer cache.SharedIndexInformer - queue workqueue.RateLimitingInterface - client kubernetes.Interface -} - func (c *KmeshManageController) Run(stopChan <-chan struct{}) { - go func() { - c.informerFactory.Start(stopChan) - if !cache.WaitForCacheSync(stopChan, c.podInformer.HasSynced) { - log.Error("Timed out waiting for caches to sync") - return - } - for { - c.processItems() - } - }() + defer c.queue.ShutDown() + c.informerFactory.Start(stopChan) + c.factory.Start(stopChan) + if !cache.WaitForCacheSync(stopChan, c.podInformer.HasSynced, c.namespaceInformer.HasSynced) { + log.Error("Timed out waiting for caches to sync") + return + } + for c.processItems() { + } } -func (c *KmeshManageController) processItems() { +func (c *KmeshManageController) processItems() bool { key, quit := c.queue.Get() if quit { - return + return false } defer c.queue.Done(key) queueItem, ok := key.(QueueItem) if !ok { log.Errorf("expected QueueItem but got %T", key) - return + return true } - var err error - if queueItem.action == ActionAddAnnotation { - err = addKmeshAnnotation(c.client, queueItem.pod) - } else if queueItem.action == ActionDeleteAnnotation { - err = delKmeshAnnotation(c.client, queueItem.pod) + pod, err := c.podLister.Pods(queueItem.podNs).Get(queueItem.podName) + if err != nil { + if apierrors.IsNotFound(err) { + log.Infof("pod %s/%s has been deleted", queueItem.podNs, queueItem.podName) + return true + } + log.Errorf("failed to get pod %s/%s: %v", queueItem.podNs, queueItem.podName, err) + } + if pod != nil { + // TODO: handle error + namespace, _ := c.namespaceLister.Get(pod.Namespace) + if queueItem.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) { + log.Infof("add annotation for pod %s/%s", pod.Namespace, pod.Name) + err = utils.PatchKmeshRedirectAnnotation(c.client, pod) + } else if queueItem.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) { + log.Infof("delete annotation for pod %s/%s", pod.Namespace, pod.Name) + err = utils.DelKmeshRedirectAnnotation(c.client, pod) + } } if err != nil { if c.queue.NumRequeues(key) < MaxRetries { - log.Errorf("failed to handle pod %s/%s action %s, err: %v, will retry", queueItem.pod.Namespace, queueItem.pod.Name, queueItem.action, err) + log.Errorf("failed to handle pod %s/%s action %s, err: %v, will retry", queueItem.podNs, queueItem.podName, queueItem.action, err) c.queue.AddRateLimited(key) } else { - log.Errorf("failed to handle pod %s/%s action %s after %d retries, err: %v, giving up", queueItem.pod.Namespace, queueItem.pod.Name, queueItem.action, MaxRetries, err) + log.Errorf("failed to handle pod %s/%s action %s after %d retries, err: %v, giving up", queueItem.podNs, queueItem.podName, queueItem.action, MaxRetries, err) c.queue.Forget(key) } - return - } - - c.queue.Forget(key) -} - -func shouldEnroll(client kubernetes.Interface, pod *corev1.Pod) bool { - // Check if the Pod's label indicates it should be managed by Kmesh - if strings.EqualFold(pod.Labels[constants.DataPlaneModeLabel], constants.DataPlaneModeKmesh) { return true } - - // If it is a Pod of waypoint, it should not be managed by Kmesh - if strings.Contains(pod.Name, "waypoint") { - return false - } - - ns, err := client.CoreV1().Namespaces().Get(context.TODO(), pod.Namespace, metav1.GetOptions{}) - if err != nil { - log.Errorf("failed to get namespace %s: %v", pod.Namespace, err) - return false - } - // Check if the namespace's label indicates it should be managed by Kmesh - if strings.EqualFold(ns.Labels[constants.DataPlaneModeLabel], constants.DataPlaneModeKmesh) { - return true - } - return false -} - -func addKmeshAnnotation(client kubernetes.Interface, pod *corev1.Pod) error { - if value, exists := pod.Annotations[constants.KmeshRedirectionAnnotation]; exists && value == "enabled" { - log.Debugf("Pod %s in namespace %s already has annotation %s with value %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation, value) - return nil - } - _, err := client.CoreV1().Pods(pod.Namespace).Patch( - context.Background(), - pod.Name, - k8stypes.MergePatchType, - annotationAddPatch, - metav1.PatchOptions{}, - ) - return err -} - -func delKmeshAnnotation(client kubernetes.Interface, pod *corev1.Pod) error { - if _, exists := pod.Annotations[constants.KmeshRedirectionAnnotation]; !exists { - log.Debugf("Pod %s in namespace %s does not have annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation) - return nil - } - _, err := client.CoreV1().Pods(pod.Namespace).Patch( - context.Background(), - pod.Name, - k8stypes.MergePatchType, - annotationDelPatch, - metav1.PatchOptions{}, - ) - return err -} - -func handleKmeshManage(ns string, op bool) error { - execFunc := func(netns.NetNS) error { - port := constants.OperEnableControl - if !op { - port = constants.OperDisableControl - } - return nets.TriggerControlCommand(port) - } - - if err := netns.WithNetNSPath(ns, execFunc); err != nil { - err = fmt.Errorf("enter ns path :%v, run execFunc failed: %v", ns, err) - return err - } - return nil + c.queue.Forget(key) + return true } func sendCertRequest(security *kmeshsecurity.SecretManager, pod *corev1.Pod, op int) { diff --git a/pkg/controller/manage/manage_controller_test.go b/pkg/controller/manage/manage_controller_test.go new file mode 100644 index 000000000..918cd69f0 --- /dev/null +++ b/pkg/controller/manage/manage_controller_test.go @@ -0,0 +1,326 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kmeshmanage + +import ( + "context" + "fmt" + "os" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "istio.io/istio/pkg/test/util/retry" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + + "kmesh.net/kmesh/pkg/constants" + "kmesh.net/kmesh/pkg/utils" +) + +func waitAndCheckManageAction(t *testing.T, enabled *atomic.Bool, disabled *atomic.Bool, enableExpected bool, disableExpected bool) { + retry.UntilSuccess(func() error { + // Wait for the handleKmeshManage to be called + if enableExpected != enabled.Load() || disableExpected != disabled.Load() { + return fmt.Errorf("enabled: %v, disabled: %v", enabled.Load(), disabled.Load()) + } + return nil + }) + assert.Equal(t, enableExpected, enabled.Load(), "unexpected value for enabled flag") + assert.Equal(t, disableExpected, disabled.Load(), "unexpected value for disabled flag") +} + +func TestHandleKmeshManage(t *testing.T) { + client := fake.NewSimpleClientset() + + err := os.Setenv("NODE_NAME", "test_node") + require.NoError(t, err) + t.Cleanup(func() { + os.Unsetenv("NODE_NAME") + }) + controller, err := NewKmeshManageController(client, nil) + if err != nil { + t.Fatalf("error creating KmeshManageController: %v", err) + } + + stopChan := make(chan struct{}) + defer close(stopChan) + + go controller.Run(stopChan) + cache.WaitForCacheSync(stopChan, controller.podInformer.HasSynced, controller.namespaceInformer.HasSynced) + + enabled := atomic.Bool{} + disabled := atomic.Bool{} + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(utils.HandleKmeshManage, func(ns string, op bool) error { + if op { + enabled.Store(true) + } else { + disabled.Store(true) + } + return nil + }) + + patches.ApplyMethodFunc(reflect.TypeOf(controller.queue), "AddRateLimited", func(item interface{}) { + queueItem, ok := item.(QueueItem) + if !ok { + t.Logf("expected QueueItem but got %T", item) + return + } + pod, err := controller.podLister.Pods(queueItem.podNs).Get(queueItem.podName) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("pod %s/%s has been deleted", queueItem.podNs, queueItem.podName) + return + } + t.Errorf("failed to get pod %s/%s: %v", queueItem.podNs, queueItem.podName, err) + } + + if pod != nil { + namespace, _ := controller.namespaceLister.Get(pod.Namespace) + if queueItem.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) { + t.Logf("add annotation for pod %s/%s", pod.Namespace, pod.Name) + err = utils.PatchKmeshRedirectAnnotation(controller.client, pod) + } else if queueItem.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) { + t.Logf("delete annotation for pod %s/%s", pod.Namespace, pod.Name) + err = utils.DelKmeshRedirectAnnotation(controller.client, pod) + } + } + if err != nil { + t.Errorf("failed to handle pod %s/%s: %v", queueItem.podNs, queueItem.podName, err) + } + }) + + podWithoutLabel := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "ut-pod", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + podWithLabel := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "ut-pod", + Labels: map[string]string{constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh}, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + podWithNoneLabel := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "ut-pod", + Labels: map[string]string{constants.DataPlaneModeLabel: "none"}, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + + nsWithoutLabel := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + nsWithLabel := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + Labels: map[string]string{constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh}, + }, + } + + type args struct { + namespace *corev1.Namespace + pod *corev1.Pod + create, update, delete bool + } + tests := []struct { + name string + args args + expectManaged bool + expectDisManaged bool + }{ + { + name: "1. ns without label, pod without label", + args: args{ + namespace: nsWithoutLabel, + pod: podWithoutLabel, + create: true, + }, + expectManaged: false, + expectDisManaged: false, + }, + { + name: "2. ns without label, pod update with label", + args: args{ + namespace: nsWithoutLabel, + pod: podWithLabel, + update: true, + }, + expectManaged: true, + expectDisManaged: false, + }, + { + name: "2.1 ns without label, pod update with `none` label", + args: args{ + namespace: nsWithoutLabel, + pod: podWithNoneLabel, + update: true, + }, + expectManaged: false, + expectDisManaged: true, + }, + { + name: "3. ns without label, pod with none label delete", + args: args{ + namespace: nsWithoutLabel, + pod: podWithNoneLabel, + delete: true, + }, + expectManaged: false, + expectDisManaged: false, + }, + { + name: "4. ns without label, pod with label", + args: args{ + namespace: nsWithoutLabel, + pod: podWithLabel, + create: true, + }, + expectManaged: true, + }, + { + name: "4.1 ns without label, pod with label delete", + args: args{ + namespace: nsWithoutLabel, + pod: podWithLabel, + delete: true, + }, + expectManaged: false, + expectDisManaged: false, + }, + + { + name: "5. ns with label, pod without label", + args: args{ + namespace: nsWithLabel, + pod: podWithoutLabel, + create: true, + }, + expectManaged: true, + expectDisManaged: false, + }, + { + name: "6. ns with label, pod update with label", + args: args{ + namespace: nsWithLabel, + pod: podWithLabel, + update: true, + }, + expectManaged: false, + expectDisManaged: false, + }, + { + name: "7. ns with label, pod update with none label", + args: args{ + namespace: nsWithLabel, + pod: podWithNoneLabel, + update: true, + }, + expectManaged: false, + expectDisManaged: true, + }, + { + name: "8. ns with label, pod delete with none label", + args: args{ + namespace: nsWithLabel, + pod: podWithNoneLabel, + delete: true, + }, + expectDisManaged: false, + }, + } + + _, err = client.CoreV1().Namespaces().Create(context.TODO(), nsWithoutLabel, metav1.CreateOptions{}) + assert.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err = client.CoreV1().Namespaces().Update(context.TODO(), tt.args.namespace, metav1.UpdateOptions{}) + assert.NoError(t, err) + // TODO: find a way to wait for namespace informer to sync + time.Sleep(5 * time.Millisecond) + + enabled.Store(false) + disabled.Store(false) + + if tt.args.create { + pod := tt.args.pod.DeepCopy() + _, err = client.CoreV1().Pods(tt.args.namespace.Name).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + + if tt.args.update { + pod, _ := client.CoreV1().Pods(tt.args.namespace.Name).Get(context.TODO(), tt.args.pod.Name, metav1.GetOptions{}) + if pod != nil { + pod.Labels = tt.args.pod.Labels + } + _, err = client.CoreV1().Pods(tt.args.namespace.Name).Update(context.TODO(), pod, metav1.UpdateOptions{}) + assert.NoError(t, err) + } + + if tt.args.delete { + err = client.CoreV1().Pods(tt.args.namespace.Name).Delete(context.TODO(), tt.args.pod.Name, metav1.DeleteOptions{}) + assert.NoError(t, err) + } + + waitAndCheckManageAction(t, &enabled, &disabled, tt.expectManaged, tt.expectDisManaged) + }) + } +} diff --git a/pkg/utils/enroll.go b/pkg/utils/enroll.go new file mode 100644 index 000000000..347143b29 --- /dev/null +++ b/pkg/utils/enroll.go @@ -0,0 +1,126 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "context" + "fmt" + "strings" + + netns "github.com/containernetworking/plugins/pkg/ns" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + + "kmesh.net/kmesh/pkg/constants" + "kmesh.net/kmesh/pkg/nets" + "kmesh.net/kmesh/pkg/utils/istio" +) + +// ShouldEnroll checks whether a pod should be managed by kmesh. +// Kmesh manages a pod if a pod has "istio.io/dataplane-mode: kmesh" label +// or the namespace where it resides has the label while pod have no "istio.io/dataplane-mode: none" label +// Excluding cases: a pod has sidecar injected, or the pod is istio managed waypoint +// https://github.com/istio/istio/blob/33539491628fe5f3ad4f5f1fb339b0da9455c028/manifests/charts/istio-control/istio-discovery/files/waypoint.yaml#L35 +func ShouldEnroll(pod *corev1.Pod, ns *corev1.Namespace) bool { + if istio.PodHasSidecar(pod) { + return false + } + + // If it is a Pod of waypoint, it should not be managed by Kmesh + // Exclude istio managed gateway + if gateway, ok := pod.Labels["gateway.istio.io/managed"]; ok { + if strings.EqualFold(gateway, "istio.io-mesh-controller") { + return false + } + } + + var nsMode string + if ns != nil { + nsMode = ns.Labels[constants.DataPlaneModeLabel] + } + podMode := pod.Labels[constants.DataPlaneModeLabel] + // check if pod label contains istio.io/dataplane-mode: kmesh + if strings.EqualFold(podMode, constants.DataPlaneModeKmesh) { + return true + } + // check if ns label contains istio.io/dataplane-mode: kmesh, but pod is not excluded + if strings.EqualFold(nsMode, constants.DataPlaneModeKmesh) && podMode != "none" { + return true + } + return false +} + +func HandleKmeshManage(ns string, enroll bool) error { + execFunc := func(netns.NetNS) error { + port := constants.OperEnableControl + if !enroll { + port = constants.OperDisableControl + } + return nets.TriggerControlCommand(port) + } + + if err := netns.WithNetNSPath(ns, execFunc); err != nil { + err = fmt.Errorf("enter ns path :%v, run execFunc failed: %v", ns, err) + return err + } + return nil +} + +var ( + annotationDelPatch = []byte(fmt.Sprintf( + `{"metadata":{"annotations":{"%s":null}}}`, + constants.KmeshRedirectionAnnotation, + )) + + annotationAddPatch = []byte(fmt.Sprintf( + `{"metadata":{"annotations":{"%s":"%s"}}}`, + constants.KmeshRedirectionAnnotation, + "enabled", + )) +) + +func PatchKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) error { + if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" { + log.Debugf("Pod %s in namespace %s already has annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation) + return nil + } + _, err := client.CoreV1().Pods(pod.Namespace).Patch( + context.Background(), + pod.Name, + k8stypes.MergePatchType, + annotationAddPatch, + metav1.PatchOptions{}, + ) + return err +} + +func DelKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) error { + if _, exists := pod.Annotations[constants.KmeshRedirectionAnnotation]; !exists { + log.Debugf("Pod %s in namespace %s does not have annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation) + return nil + } + _, err := client.CoreV1().Pods(pod.Namespace).Patch( + context.Background(), + pod.Name, + k8stypes.MergePatchType, + annotationDelPatch, + metav1.PatchOptions{}, + ) + return err +} diff --git a/pkg/utils/enroll_test.go b/pkg/utils/enroll_test.go new file mode 100644 index 000000000..77147cbdc --- /dev/null +++ b/pkg/utils/enroll_test.go @@ -0,0 +1,355 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "istio.io/api/annotation" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + + "kmesh.net/kmesh/pkg/constants" +) + +func TestShouldEnroll(t *testing.T) { + type args struct { + namespace *corev1.Namespace + pod *corev1.Pod + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "pod with label", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-pod", + Labels: map[string]string{ + constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, + }, + }, + }, + }, + want: true, + }, + { + name: "sidecar misconfigured label", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-pod", + Labels: map[string]string{ + constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, + }, + Annotations: map[string]string{ + annotation.SidecarStatus.Name: "", + }, + }, + }, + }, + want: false, + }, + { + name: "pod and namespace without label", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-pod", + }, + }, + }, + want: false, + }, + { + name: "namespace with label", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + Labels: map[string]string{ + constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, + }, + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-pod", + }, + }, + }, + want: true, + }, + { + name: "pod with none label not managed by Kmesh", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + Labels: map[string]string{ + constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, + }, + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-pod", + Labels: map[string]string{ + constants.DataPlaneModeLabel: "none", + }, + }, + }, + }, + want: false, + }, + { + name: "waypoint should not managed by Kmesh", + args: args{ + namespace: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ut-test", + Labels: map[string]string{ + constants.DataPlaneModeLabel: constants.DataPlaneModeKmesh, + }, + }, + }, + pod: &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ut-test", + Name: "ut-waypoint", + Labels: map[string]string{ + "gateway.istio.io/managed": "istio.io-mesh-controller", + }, + }, + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ShouldEnroll(tt.args.pod, tt.args.namespace); got != tt.want { + t.Errorf("shouldEnroll() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestHandleKmeshManage(t *testing.T) { + type args struct { + ns string + enroll bool + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "enroll true", + args: args{ + enroll: true, + }, + wantErr: false, + }, + { + name: "enroll false", + args: args{ + enroll: false, + }, + wantErr: false, + }, + { + name: "enroll false", + args: args{ + ns: "invalid ns", + enroll: false, + }, + wantErr: true, + }, + } + pid := os.Getpid() + ns := fmt.Sprintf("/proc/%d/ns/net", pid) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var err error + if tt.args.ns != "" { + err = HandleKmeshManage(tt.args.ns, tt.args.enroll) + } else { + err = HandleKmeshManage(ns, tt.args.enroll) + } + if (err != nil) != tt.wantErr { + t.Errorf("HandleKmeshManage() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} +func TestPatchKmeshRedirectAnnotation(t *testing.T) { + client := fake.NewSimpleClientset() + namespace := "test-namespace" + podName := "test-pod" + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: podName, + }, + } + + _, err := client.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + + err = PatchKmeshRedirectAnnotation(client, pod) + if err != nil { + t.Errorf("PatchKmeshRedirectAnnotation() returned an error: %v", err) + } + + got, err := client.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Failed to get the patched pod: %v", err) + } + + if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" { + t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation]) + } + + err = PatchKmeshRedirectAnnotation(client, got) + if err != nil { + t.Errorf("PatchKmeshRedirectAnnotation() returned an error: %v", err) + } + + got, err = client.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Failed to get the patched pod: %v", err) + } + + if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" { + t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation]) + } +} + +func TestDelKmeshRedirectAnnotation(t *testing.T) { + client := fake.NewSimpleClientset() + namespace := "test-namespace" + podName := "test-pod" + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: podName, + Annotations: map[string]string{ + constants.KmeshRedirectionAnnotation: "enabled", + }, + }, + } + + _, err := client.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + + err = DelKmeshRedirectAnnotation(client, pod) + if err != nil { + t.Errorf("DelKmeshRedirectAnnotation() returned an error: %v", err) + } + + got, err := client.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Failed to get patched pod: %v", err) + } + + if _, exists := got.Annotations[constants.KmeshRedirectionAnnotation]; exists { + t.Errorf("Annotation %s was not deleted from pod %s in namespace %s", constants.KmeshRedirectionAnnotation, podName, namespace) + } + + err = DelKmeshRedirectAnnotation(client, pod) + if err != nil { + t.Errorf("DelKmeshRedirectAnnotation() returned an error: %v", err) + } +} diff --git a/pkg/utils/istio/sidecar.go b/pkg/utils/istio/sidecar.go new file mode 100644 index 000000000..92e85c478 --- /dev/null +++ b/pkg/utils/istio/sidecar.go @@ -0,0 +1,30 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package istio + +import ( + "istio.io/api/annotation" + corev1 "k8s.io/api/core/v1" +) + +func PodHasSidecar(pod *corev1.Pod) bool { + if _, f := pod.GetAnnotations()[annotation.SidecarStatus.Name]; f { + return true + } + + return false +} diff --git a/pkg/utils/istio/sidecar_test.go b/pkg/utils/istio/sidecar_test.go new file mode 100644 index 000000000..18de4045f --- /dev/null +++ b/pkg/utils/istio/sidecar_test.go @@ -0,0 +1,71 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package istio + +import ( + "testing" + + "istio.io/api/annotation" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestPodHasSidecar(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + want bool + }{ + { + name: "pod with annotation", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{ + annotation.SidecarStatus.Name: "placeholder", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + want: true, + }, + { + name: "pod without annotation", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Annotations: map[string]string{}, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := PodHasSidecar(tt.pod); got != tt.want { + t.Errorf("PodHasSidecar() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/test/e2e/baseline_test.go b/test/e2e/baseline_test.go index ca43deb89..9f5dcbdca 100644 --- a/test/e2e/baseline_test.go +++ b/test/e2e/baseline_test.go @@ -24,20 +24,52 @@ package kmesh import ( + "context" "fmt" + "net/http" "strings" "testing" "time" + "istio.io/istio/pkg/config/constants" + echot "istio.io/istio/pkg/test/echo" "istio.io/istio/pkg/test/echo/common/scheme" "istio.io/istio/pkg/test/framework" "istio.io/istio/pkg/test/framework/components/echo" "istio.io/istio/pkg/test/framework/components/echo/check" + "istio.io/istio/pkg/test/framework/components/echo/common/ports" + "istio.io/istio/pkg/test/scopes" "istio.io/istio/pkg/util/sets" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) +func IsL7() echo.Checker { + return check.Each(func(r echot.Response) error { + // TODO: response headers? + _, f := r.RequestHeaders[http.CanonicalHeaderKey("X-Request-Id")] + if !f { + return fmt.Errorf("X-Request-Id not set, is L7 processing enabled?") + } + return nil + }) +} + +func IsL4() echo.Checker { + return check.Each(func(r echot.Response) error { + // TODO: response headers? + _, f := r.RequestHeaders[http.CanonicalHeaderKey("X-Request-Id")] + if f { + return fmt.Errorf("X-Request-Id set, is L7 processing enabled unexpectedly?") + } + return nil + }) +} + var ( - callOptions = []echo.CallOptions{ + httpValidator = check.And(check.OK(), IsL7()) + tcpValidator = check.And(check.OK(), IsL4()) + callOptions = []echo.CallOptions{ { Port: echo.Port{Name: "http"}, Scheme: scheme.HTTP, @@ -51,6 +83,80 @@ var ( } ) +func supportsL7(opt echo.CallOptions, src, dst echo.Instance) bool { + isL7Scheme := opt.Scheme == scheme.HTTP || opt.Scheme == scheme.GRPC || opt.Scheme == scheme.WebSocket + return dst.Config().HasAnyWaypointProxy() && isL7Scheme +} + +func OriginalSourceCheck(t framework.TestContext, src echo.Instance) echo.Checker { + // Check that each response saw one of the workload IPs for the src echo instance + addresses := sets.New(src.WorkloadsOrFail(t).Addresses()...) + return check.Each(func(response echot.Response) error { + if !addresses.Contains(response.IP) { + return fmt.Errorf("expected original source (%v) to be propagated, but got %v", addresses.UnsortedList(), response.IP) + } + return nil + }) +} + +// Test access to service, enabling L7 processing and propagating original src when appropriate. +func TestServices(t *testing.T) { + runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { + if opt.Scheme != scheme.HTTP { + return + } + if supportsL7(opt, src, dst) { + opt.Check = httpValidator + } else { + opt.Check = tcpValidator + } + + if !dst.Config().HasServiceAddressedWaypointProxy() { + // Check original source, unless there is a waypoint in the path. For waypoint, we don't (yet?) propagate original src. + opt.Check = check.And(opt.Check, OriginalSourceCheck(t, src)) + } + + src.CallOrFail(t, opt) + }) +} + +// Test access directly using pod IP. +func TestPodIP(t *testing.T) { + framework.NewTest(t).Run(func(t framework.TestContext) { + for _, src := range apps.All { + for _, srcWl := range src.WorkloadsOrFail(t) { + srcWl := srcWl + t.NewSubTestf("from %v %v", src.Config().Service, srcWl.Address()).Run(func(t framework.TestContext) { + for _, dst := range apps.All { + for _, dstWl := range dst.WorkloadsOrFail(t) { + t.NewSubTestf("to %v %v", dst.Config().Service, dstWl.Address()).Run(func(t framework.TestContext) { + src, dst, srcWl, dstWl := src, dst, srcWl, dstWl + if src.Config().HasSidecar() { + t.Skip("not supported yet") + } + for _, opt := range callOptions { + opt := opt.DeepCopy() + opt.Check = tcpValidator + + opt.Address = dstWl.Address() + opt.Check = check.And(opt.Check, check.Hostname(dstWl.PodName())) + + opt.Port = echo.Port{ServicePort: ports.All().MustForName(opt.Port.Name).WorkloadPort} + opt.ToWorkload = dst.WithWorkloads(dstWl) + + t.NewSubTestf("%v", opt.Scheme).RunParallel(func(t framework.TestContext) { + src.WithWorkloads(srcWl).CallOrFail(t, opt) + }) + } + }) + } + } + }) + } + } + }) +} + func TestTrafficSplit(t *testing.T) { runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { // Need at least one waypoint proxy and HTTP @@ -342,6 +448,132 @@ spec: }) } +// Test add/remove waypoint at pod granularity. +func TestAddRemovePodWaypoint(t *testing.T) { + framework.NewTest(t).Run(func(t framework.TestContext) { + waypoint := "pod-waypoint" + newWaypointProxyOrFail(t, t, apps.Namespace, waypoint, constants.WorkloadTraffic) + + t.Cleanup(func() { + deleteWaypointProxyOrFail(t, t, apps.Namespace, waypoint) + }) + + dst := apps.EnrolledToKmesh + t.NewSubTest("before").Run(func(t framework.TestContext) { + for _, src := range apps.All { + if src.Config().IsUncaptured() { + continue + } + for _, dstWl := range dst.WorkloadsOrFail(t) { + t.NewSubTestf("from %v", src.Config().Service).Run(func(t framework.TestContext) { + c := IsL4() + opt := echo.CallOptions{ + Address: dstWl.Address(), + Port: echo.Port{ServicePort: ports.All().MustForName("http").WorkloadPort}, + Scheme: scheme.HTTP, + Count: 10, + Check: check.And(check.OK(), c), + } + src.CallOrFail(t, opt) + }) + } + + } + }) + + // Configure pods to use waypoint. + for _, dstWl := range dst.WorkloadsOrFail(t) { + SetWaypoint(t, apps.Namespace.Name(), dstWl.PodName(), waypoint, Workload) + } + + // Now should always be L7. + t.NewSubTest("after").Run(func(t framework.TestContext) { + for _, src := range apps.All { + if src.Config().IsUncaptured() { + continue + } + for _, dstWl := range dst.WorkloadsOrFail(t) { + t.NewSubTestf("from %v", src.Config().Service).Run(func(t framework.TestContext) { + c := IsL4() + opt := echo.CallOptions{ + Address: dstWl.Address(), + Port: echo.Port{ServicePort: ports.All().MustForName("http").WorkloadPort}, + Scheme: scheme.HTTP, + Count: 10, + Check: check.And(check.OK(), c), + } + src.CallOrFail(t, opt) + }) + } + + } + }) + }) +} + +// Test add/remove waypoint at ns or service granularity. +func TestRemoveAddNsOrServiceWaypoint(t *testing.T) { + for _, granularity := range []Granularity{Service /*,Namespace*/} { + framework.NewTest(t).Run(func(t framework.TestContext) { + var waypoint string + switch granularity { + case Namespace: + waypoint = "namespace-waypoint" + case Service: + waypoint = "service-waypoint" + } + + newWaypointProxyOrFail(t, t, apps.Namespace, waypoint, constants.ServiceTraffic) + + t.Cleanup(func() { + deleteWaypointProxyOrFail(t, t, apps.Namespace, waypoint) + }) + + t.NewSubTest("before").Run(func(t framework.TestContext) { + dst := apps.EnrolledToKmesh + for _, src := range apps.All { + if src.Config().IsUncaptured() { + continue + } + t.NewSubTestf("from %v", src.Config().Service).Run(func(t framework.TestContext) { + c := IsL4() + opt := echo.CallOptions{ + To: dst, + Port: echo.Port{Name: "http"}, + Scheme: scheme.HTTP, + Count: 10, + Check: check.And(check.OK(), c), + } + src.CallOrFail(t, opt) + }) + } + }) + + SetWaypoint(t, apps.Namespace.Name(), EnrolledToKmesh, waypoint, granularity) + + // Now should always be L7 + t.NewSubTest("after").Run(func(t framework.TestContext) { + dst := apps.EnrolledToKmesh + for _, src := range apps.All { + if src.Config().IsUncaptured() { + continue + } + t.NewSubTestf("from %v", src.Config().Service).Run(func(t framework.TestContext) { + opt := echo.CallOptions{ + To: dst, + Port: echo.Port{Name: "http"}, + Scheme: scheme.HTTP, + Count: 10, + Check: check.And(check.OK(), IsL7()), + } + src.CallOrFail(t, opt) + }) + } + }) + }) + } +} + func runTest(t *testing.T, f func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions)) { framework.NewTest(t).Run(func(t framework.TestContext) { runTestContext(t, f) @@ -386,3 +618,47 @@ func runTestContext(t framework.TestContext, f func(t framework.TestContext, src }) } } + +type Granularity int + +const ( + Namespace Granularity = iota + Service + Workload +) + +func SetWaypoint(t framework.TestContext, ns string, name string, waypoint string, granularity Granularity) { + for _, c := range t.Clusters() { + setWaypoint := func(waypoint string) error { + if waypoint == "" { + waypoint = "null" + } else { + waypoint = fmt.Sprintf("%q", waypoint) + } + label := []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":%s}}}`, constants.AmbientUseWaypointLabel, waypoint)) + + switch granularity { + case Namespace: + _, err := c.Kube().CoreV1().Namespaces().Patch(context.TODO(), ns, types.MergePatchType, label, metav1.PatchOptions{}) + return err + case Service: + _, err := c.Kube().CoreV1().Services(ns).Patch(context.TODO(), name, types.MergePatchType, label, metav1.PatchOptions{}) + return err + case Workload: + _, err := c.Kube().CoreV1().Pods(ns).Patch(context.TODO(), name, types.MergePatchType, label, metav1.PatchOptions{}) + return err + } + + return nil + } + + if err := setWaypoint(waypoint); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := setWaypoint(""); err != nil { + scopes.Framework.Errorf("failed resetting waypoint for %s/%s", ns, name) + } + }) + } +} diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index e4fc89f03..aa919a180 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -37,12 +37,14 @@ import ( "istio.io/istio/pkg/config/protocol" "istio.io/istio/pkg/config/schema/gvk" istioKube "istio.io/istio/pkg/kube" + "istio.io/istio/pkg/test" "istio.io/istio/pkg/test/framework" "istio.io/istio/pkg/test/framework/components/ambient" "istio.io/istio/pkg/test/framework/components/crd" "istio.io/istio/pkg/test/framework/components/echo" "istio.io/istio/pkg/test/framework/components/echo/common/ports" "istio.io/istio/pkg/test/framework/components/echo/deployment" + "istio.io/istio/pkg/test/framework/components/echo/match" "istio.io/istio/pkg/test/framework/components/istio" "istio.io/istio/pkg/test/framework/components/namespace" "istio.io/istio/pkg/test/framework/resource" @@ -69,6 +71,9 @@ type EchoDeployments struct { // All echo services All echo.Instances + // The echo service which is enrolled to Kmesh without waypoint. + EnrolledToKmesh echo.Instances + // WaypointProxies by WaypointProxies map[string]ambient.WaypointProxy } @@ -170,6 +175,7 @@ func SetupApps(t resource.Context, i istio.Instance, apps *EchoDeployments) erro scopes.Framework.Infof("built %v", b.Config().Service) } apps.All = echos + apps.EnrolledToKmesh = match.ServiceName(echo.NamespacedName{Name: EnrolledToKmesh, Namespace: apps.Namespace}).GetMatches(echos) if apps.WaypointProxies == nil { apps.WaypointProxies = make(map[string]ambient.WaypointProxy) @@ -180,7 +186,7 @@ func SetupApps(t resource.Context, i istio.Instance, apps *EchoDeployments) erro wlwp := echo.Config().WorkloadWaypointProxy if svcwp != "" { if _, found := apps.WaypointProxies[svcwp]; !found { - apps.WaypointProxies[svcwp], err = newWaypointProxy(t, apps.Namespace, svcwp) + apps.WaypointProxies[svcwp], err = newWaypointProxy(t, apps.Namespace, svcwp, constants.ServiceTraffic) if err != nil { return err } @@ -188,7 +194,7 @@ func SetupApps(t resource.Context, i istio.Instance, apps *EchoDeployments) erro } if wlwp != "" { if _, found := apps.WaypointProxies[wlwp]; !found { - apps.WaypointProxies[wlwp], err = newWaypointProxy(t, apps.Namespace, wlwp) + apps.WaypointProxies[wlwp], err = newWaypointProxy(t, apps.Namespace, wlwp, constants.WorkloadTraffic) if err != nil { return err } @@ -240,7 +246,13 @@ func (k kubeComponent) Close() error { return nil } -func newWaypointProxy(ctx resource.Context, ns namespace.Instance, name string) (ambient.WaypointProxy, error) { +func newWaypointProxyOrFail(t test.Failer, ctx resource.Context, ns namespace.Instance, name string, trafficType string) { + if _, err := newWaypointProxy(ctx, ns, name, trafficType); err != nil { + t.Fatal("create new waypoint proxy failed: %v", err) + } +} + +func newWaypointProxy(ctx resource.Context, ns namespace.Instance, name string, trafficType string) (ambient.WaypointProxy, error) { err := crd.DeployGatewayAPI(ctx) if err != nil { return nil, err @@ -255,6 +267,9 @@ func newWaypointProxy(ctx resource.Context, ns namespace.Instance, name string) Name: name, Namespace: ns.Name(), Annotations: make(map[string]string, 0), + Labels: map[string]string{ + constants.AmbientWaypointForTrafficTypeLabel: trafficType, + }, }, Spec: gateway.GatewaySpec{ GatewayClassName: constants.WaypointGatewayClassName, @@ -314,3 +329,15 @@ func newWaypointProxy(ctx resource.Context, ns namespace.Instance, name string) return server, nil } + +func deleteWaypointProxyOrFail(t test.Failer, ctx resource.Context, ns namespace.Instance, name string) { + if err := deleteWaypointProxy(ctx, ns, name); err != nil { + t.Fatal("delete waypoint proxy failed: %v", err) + } +} + +func deleteWaypointProxy(ctx resource.Context, ns namespace.Instance, name string) error { + cls := ctx.Clusters().Default() + + return cls.GatewayAPI().GatewayV1().Gateways(ns.Name()).Delete(context.Background(), name, metav1.DeleteOptions{}) +} diff --git a/test/e2e/run_test.sh b/test/e2e/run_test.sh index e260f3d3b..9d48224fa 100755 --- a/test/e2e/run_test.sh +++ b/test/e2e/run_test.sh @@ -184,6 +184,21 @@ function install_dependencies() { rm -rf istio-${ISTIO_VERSION} } +function cleanup_kind_cluster() { + local NAME="${1:-kmesh-testing}" + echo "Deleting KinD cluster with name=${NAME}" + kind delete cluster --name="${NAME}" + echo "KinD cluster ${NAME} cleaned up" +} + +function cleanup_docker_registry() { + echo "Stopping Docker registry named '${KIND_REGISTRY_NAME}'..." + docker stop "${KIND_REGISTRY_NAME}" || echo "Failed to stop or no such registry '${KIND_REGISTRY_NAME}'." + + echo "Removing Docker registry named '${KIND_REGISTRY_NAME}'..." + docker rm "${KIND_REGISTRY_NAME}" || echo "Failed to remove or no such registry '${KIND_REGISTRY_NAME}'." +} + while (( "$#" )); do case "$1" in --skip-install-dep) @@ -208,6 +223,11 @@ while (( "$#" )); do IPV6=true shift ;; + --cleanup) + CLEANUP_KIND=true + CLEANUP_REGISTRY=true + shift + ;; esac done @@ -231,3 +251,11 @@ if [[ -z "${SKIP_SETUP:-}" ]]; then fi go test -v -tags=integ $ROOT_DIR/test/e2e/... -count=1 + +if [[ -n "${CLEANUP_KIND}" ]]; then + cleanup_kind_cluster +fi + +if [[ -n "${CLEANUP_REGISTRY}" ]]; then + cleanup_docker_registry +fi \ No newline at end of file