From 120649fe8671aca35892f8cabb0c2f73db36e286 Mon Sep 17 00:00:00 2001 From: ashish Date: Sat, 28 May 2022 18:51:29 +0530 Subject: [PATCH 1/6] Multi context refactor Signed-off-by: ashish --- go.mod | 1 + osm/install.go | 50 +++++++++++----- osm/oam.go | 25 ++++---- osm/operations.go | 9 ++- osm/osm.go | 17 +++--- osm/sample_apps.go | 141 ++++++++++++++++++++++++++------------------- 6 files changed, 146 insertions(+), 97 deletions(-) diff --git a/go.mod b/go.mod index 16b106e9..063fb68d 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ replace ( github.com/kudobuilder/kuttl => github.com/layer5io/kuttl v0.4.1-0.20200806180306-b7e46afd657f go.opentelemetry.io/otel v1.3.0 => go.opentelemetry.io/otel v0.11.0 go.opentelemetry.io/otel/sdk v1.3.0 => go.opentelemetry.io/otel/sdk v0.11.0 + github.com/layer5io/meshery-adapter-library => ../meshery-adapter-library ) require ( diff --git a/osm/install.go b/osm/install.go index 9f8cfd05..9af7ca97 100644 --- a/osm/install.go +++ b/osm/install.go @@ -16,13 +16,14 @@ package osm import ( "fmt" + "sync" "github.com/layer5io/meshery-adapter-library/adapter" "github.com/layer5io/meshery-adapter-library/status" mesherykube "github.com/layer5io/meshkit/utils/kubernetes" ) -func (h *Handler) installOSM(del bool, version, ns string) (string, error) { +func (h *Handler) installOSM(del bool, version, ns string, kubeconfigs []string) (string, error) { h.Log.Debug(fmt.Sprintf("Requested install of version: %s", version)) h.Log.Debug(fmt.Sprintf("Requested action is delete: %v", del)) h.Log.Debug(fmt.Sprintf("Requested action is in namespace: %s", ns)) @@ -38,7 +39,7 @@ func (h *Handler) installOSM(del bool, version, ns string) (string, error) { } h.Log.Info("Installing...") - err = h.applyHelmChart(del, version, ns) + err = h.applyHelmChart(del, version, ns, kubeconfigs) if err != nil { return st, ErrApplyHelmChart(err) } @@ -51,8 +52,7 @@ func (h *Handler) installOSM(del bool, version, ns string) (string, error) { return st, nil } -func (h *Handler) applyHelmChart(del bool, version, namespace string) error { - kClient := h.MesheryKubeclient +func (h *Handler) applyHelmChart(del bool, version, namespace string, kubeconfigs []string) error { repo := "https://openservicemesh.github.io/osm/" chart := "osm" @@ -62,14 +62,36 @@ func (h *Handler) applyHelmChart(del bool, version, namespace string) error { } else { act = mesherykube.INSTALL } - return kClient.ApplyHelmChart(mesherykube.ApplyHelmChartConfig{ - ChartLocation: mesherykube.HelmChartLocation{ - Repository: repo, - Chart: chart, - Version: version, - }, - Namespace: namespace, - Action: act, - CreateNamespace: true, - }) + var wg sync.WaitGroup + var errs []error + for _, kubeconfig := range kubeconfigs { + wg.Add(1) + go func(kubeconfig string) { + defer wg.Done() + kClient, err := mesherykube.New([]byte(kubeconfig)) + if err != nil { + errs = append(errs, err) + return + } + err = kClient.ApplyHelmChart(mesherykube.ApplyHelmChartConfig{ + ChartLocation: mesherykube.HelmChartLocation{ + Repository: repo, + Chart: chart, + Version: version, + }, + Namespace: namespace, + Action: act, + CreateNamespace: true, + }) + if err != nil { + errs = append(errs, err) + return + } + }(kubeconfig) + } + wg.Wait() + if len(errs) != 0 { + return mergeErrors(errs) + } + return nil } diff --git a/osm/oam.go b/osm/oam.go index 417e403b..10e138ec 100644 --- a/osm/oam.go +++ b/osm/oam.go @@ -9,10 +9,10 @@ import ( ) // CompHandler is the type for functions which can handle OAM components -type CompHandler func(*Handler, v1alpha1.Component, bool) (string, error) +type CompHandler func(*Handler, v1alpha1.Component, bool, []string) (string, error) // HandleComponents handles the processing of OAM components -func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool) (string, error) { +func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool, kubeconfigs []string) (string, error) { var errs []error var msgs []string @@ -23,7 +23,7 @@ func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool) (stri for _, comp := range comps { fnc, ok := compFuncMap[comp.Spec.Type] if !ok { - msg, err := handleOSMCoreComponent(h, comp, isDel, "", "") + msg, err := handleOSMCoreComponent(h, comp, isDel, "", "", kubeconfigs) if err != nil { errs = append(errs, err) continue @@ -33,7 +33,7 @@ func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool) (stri continue } - msg, err := fnc(h, comp, isDel) + msg, err := fnc(h, comp, isDel, kubeconfigs) if err != nil { errs = append(errs, err) continue @@ -50,14 +50,14 @@ func (h *Handler) HandleComponents(comps []v1alpha1.Component, isDel bool) (stri } // HandleApplicationConfiguration handles the processing of OAM application configuration -func (h *Handler) HandleApplicationConfiguration(config v1alpha1.Configuration, isDel bool) (string, error) { +func (h *Handler) HandleApplicationConfiguration(config v1alpha1.Configuration, isDel bool, kubeconfigs []string) (string, error) { var errs []error var msgs []string for _, comp := range config.Spec.Components { for _, trait := range comp.Traits { if trait.Name == "automaticSidecarInjection.OSM" { namespaces := castSliceInterfaceToSliceString(trait.Properties["namespaces"].([]interface{})) - if err := handleNamespaceLabel(h, namespaces, isDel); err != nil { + if err := handleNamespaceLabel(h, namespaces, isDel, kubeconfigs); err != nil { errs = append(errs, err) } } @@ -74,10 +74,10 @@ func (h *Handler) HandleApplicationConfiguration(config v1alpha1.Configuration, } -func handleNamespaceLabel(h *Handler, namespaces []string, isDel bool) error { +func handleNamespaceLabel(h *Handler, namespaces []string, isDel bool, kubeconfigs []string) error { var errs []error for _, ns := range namespaces { - if err := h.sidecarInjection(ns, isDel); err != nil { + if err := h.sidecarInjection(ns, isDel, kubeconfigs); err != nil { errs = append(errs, err) } } @@ -85,13 +85,13 @@ func handleNamespaceLabel(h *Handler, namespaces []string, isDel bool) error { return mergeErrors(errs) } -func handleComponentOSMMesh(h *Handler, comp v1alpha1.Component, isDel bool) (string, error) { +func handleComponentOSMMesh(h *Handler, comp v1alpha1.Component, isDel bool, kubeconfigs []string) (string, error) { // Get the osm version from the settings // we are sure that the version of osm would be present // because the configuration is already validated against the schema version := comp.Spec.Settings["version"].(string) - msg, err := h.installOSM(isDel, version, comp.Namespace) + msg, err := h.installOSM(isDel, version, comp.Namespace, kubeconfigs) if err != nil { return fmt.Sprintf("%s: %s", comp.Name, msg), err } @@ -104,7 +104,8 @@ func handleOSMCoreComponent( comp v1alpha1.Component, isDel bool, apiVersion, - kind string) (string, error) { + kind string, + kubeconfigs []string) (string, error) { if apiVersion == "" { apiVersion = getAPIVersionFromComponent(comp) if apiVersion == "" { @@ -143,7 +144,7 @@ func handleOSMCoreComponent( msg = fmt.Sprintf("deleted %s config \"%s\" in namespace \"%s\"", kind, comp.Name, comp.Namespace) } - return msg, h.applyManifest(yamlByt, isDel, comp.Namespace) + return msg, h.applyManifest(yamlByt, isDel, comp.Namespace, kubeconfigs) } func getAPIVersionFromComponent(comp v1alpha1.Component) string { diff --git a/osm/operations.go b/osm/operations.go index de8f4288..541dbefc 100644 --- a/osm/operations.go +++ b/osm/operations.go @@ -11,7 +11,9 @@ import ( ) // ApplyOperation function contains the operation handlers -func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest) error { +func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error { + h.SetChannel(hchan) + kubeconfigs := request.K8sConfigs operations := make(adapter.Operations) err := h.Config.GetObject(adapter.OperationsKey, &operations) if err != nil { @@ -29,7 +31,7 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR case internalconfig.OSMOperation: go func(hh *Handler, ee *adapter.Event) { version := string(operations[request.OperationName].Versions[0]) - stat, err := hh.installOSM(request.IsDeleteOperation, version, request.Namespace) + stat, err := hh.installOSM(request.IsDeleteOperation, version, request.Namespace, kubeconfigs) if err != nil { e.Summary = fmt.Sprintf("Error while %s Open service mesh", stat) e.Details = err.Error() @@ -47,7 +49,7 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR common.EmojiVotoOperation: go func(hh *Handler, ee *adapter.Event) { appName := operations[request.OperationName].AdditionalProperties[common.ServiceName] - stat, err := hh.installSampleApp(request.IsDeleteOperation, request.Namespace, operations[request.OperationName].Templates) + stat, err := hh.installSampleApp(request.IsDeleteOperation, request.Namespace, operations[request.OperationName].Templates, kubeconfigs) if err != nil { e.Summary = fmt.Sprintf("Error while %s %s application", stat, appName) e.Details = err.Error() @@ -66,6 +68,7 @@ func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationR request.IsDeleteOperation, version, operations[request.OperationName].Templates, + kubeconfigs, ) if err != nil { e.Summary = fmt.Sprintf("Error while %s %s application", stat, appName) diff --git a/osm/osm.go b/osm/osm.go index 0c3d4733..425159ce 100644 --- a/osm/osm.go +++ b/osm/osm.go @@ -33,15 +33,16 @@ type Handler struct { func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) adapter.Handler { return &Handler{ Adapter: adapter.Adapter{ - Config: config, - Log: log, - KubeconfigHandler: kc, + Config: config, + Log: log, }, } } // ProcessOAM will handles the grpc invocation for handling OAM objects -func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (string, error) { +func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { + kubeconfigs := oamReq.K8sConfigs + h.SetChannel(hchan) var comps []v1alpha1.Component for _, acomp := range oamReq.OamComps { comp, err := oam.ParseApplicationComponent(acomp) @@ -61,13 +62,13 @@ func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (st // If operation is delete then first HandleConfiguration and then handle the deployment if oamReq.DeleteOp { // Process configuration - msg2, err := h.HandleApplicationConfiguration(config, oamReq.DeleteOp) + msg2, err := h.HandleApplicationConfiguration(config, oamReq.DeleteOp, kubeconfigs) if err != nil { return msg2, ErrProcessOAM(err) } // Process components - msg1, err := h.HandleComponents(comps, oamReq.DeleteOp) + msg1, err := h.HandleComponents(comps, oamReq.DeleteOp, kubeconfigs) if err != nil { return msg1 + "\n" + msg2, ErrProcessOAM(err) } @@ -76,13 +77,13 @@ func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest) (st } // Process components - msg1, err := h.HandleComponents(comps, oamReq.DeleteOp) + msg1, err := h.HandleComponents(comps, oamReq.DeleteOp, kubeconfigs) if err != nil { return msg1, ErrProcessOAM(err) } // Process configuration - msg2, err := h.HandleApplicationConfiguration(config, oamReq.DeleteOp) + msg2, err := h.HandleApplicationConfiguration(config, oamReq.DeleteOp, kubeconfigs) if err != nil { return msg1 + "\n" + msg2, ErrProcessOAM(err) } diff --git a/osm/sample_apps.go b/osm/sample_apps.go index b186c4d9..ce39132c 100644 --- a/osm/sample_apps.go +++ b/osm/sample_apps.go @@ -3,6 +3,7 @@ package osm import ( "context" "fmt" + "sync" "github.com/layer5io/meshery-adapter-library/adapter" "github.com/layer5io/meshery-adapter-library/status" @@ -14,7 +15,7 @@ import ( const noneNamespace = "" // installOSMBookStoreSampleApp installs or uninstalls the default OSM bookstore application -func (h *Handler) installOSMBookStoreSampleApp(del bool, version string, templates []adapter.Template) (string, error) { +func (h *Handler) installOSMBookStoreSampleApp(del bool, version string, templates []adapter.Template, kubeconfigs []string) (string, error) { st := status.Installing if del { st = status.Removing @@ -29,16 +30,16 @@ func (h *Handler) installOSMBookStoreSampleApp(del bool, version string, templat // Add the namespaces for sidecar injection for _, ns := range namespaces { - if err := createNS(h, ns, del); err != nil { + if err := createNS(h, ns, del, kubeconfigs); err != nil { return st, ErrCreatingNS(err) } - if err := h.sidecarInjection(ns, del); err != nil { + if err := h.sidecarInjection(ns, del, kubeconfigs); err != nil { return st, ErrSidecarInjection(err) } } // Install the manifests - st, err := h.installSampleApp(del, noneNamespace, templates) + st, err := h.installSampleApp(del, noneNamespace, templates, kubeconfigs) if err != nil { return st, err } @@ -46,13 +47,13 @@ func (h *Handler) installOSMBookStoreSampleApp(del bool, version string, templat return st, nil } -func (h *Handler) installSampleApp(del bool, namespace string, templates []adapter.Template) (string, error) { +func (h *Handler) installSampleApp(del bool, namespace string, templates []adapter.Template, kubeconfigs []string) (string, error) { st := status.Installing if del { st = status.Removing } for _, template := range templates { - err := h.applyManifest([]byte(template.String()), del, namespace) + err := h.applyManifest([]byte(template.String()), del, namespace, kubeconfigs) if err != nil { return st, ErrSampleApp(err) } @@ -61,49 +62,60 @@ func (h *Handler) installSampleApp(del bool, namespace string, templates []adapt } // sidecarInjection enables/disables sidecar injection on a namespace -func (h *Handler) sidecarInjection(namespace string, del bool) error { - kclient := h.KubeClient - if kclient == nil { - return ErrNilClient +func (h *Handler) sidecarInjection(namespace string, del bool, kubeconfigs []string) error { + var wg sync.WaitGroup + var errs []error + for _, k8sconfig := range kubeconfigs { + wg.Add(1) + go func(k8sconfig string) { + kClient, err := mesherykube.New([]byte(k8sconfig)) + if err != nil { + errs = append(errs, err) + return + } + // updating the label on the namespace + ns, err := kClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) + if err != nil { + errs = append(errs, err) + return + } + + if ns.ObjectMeta.Labels == nil { + ns.ObjectMeta.Labels = map[string]string{} + } + ns.ObjectMeta.Labels["openservicemesh.io/monitored-by"] = "osm" + + if del { + delete(ns.ObjectMeta.Labels, "openservicemesh.io/monitored-by") + } + + // updating the annotations on the namespace + if ns.ObjectMeta.Annotations == nil { + ns.ObjectMeta.Annotations = map[string]string{} + } + ns.ObjectMeta.Annotations["openservicemesh.io/sidecar-injection"] = "enabled" + + if del { + delete(ns.ObjectMeta.Annotations, "openservicemesh.io/sidecar-injection") + } + + fmt.Println(ns.ObjectMeta) + + _, err = kClient.KubeClient.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{}) + if err != nil { + errs = append(errs, err) + return + } + }(k8sconfig) + } + if len(errs) != 0 { + return ErrLoadNamespace(mergeErrors(errs), namespace) } - - // updating the label on the namespace - ns, err := kclient.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) - if err != nil { - return ErrLoadNamespace(err, namespace) - } - - if ns.ObjectMeta.Labels == nil { - ns.ObjectMeta.Labels = map[string]string{} - } - ns.ObjectMeta.Labels["openservicemesh.io/monitored-by"] = "osm" - - if del { - delete(ns.ObjectMeta.Labels, "openservicemesh.io/monitored-by") - } - - // updating the annotations on the namespace - if ns.ObjectMeta.Annotations == nil { - ns.ObjectMeta.Annotations = map[string]string{} - } - ns.ObjectMeta.Annotations["openservicemesh.io/sidecar-injection"] = "enabled" - - if del { - delete(ns.ObjectMeta.Annotations, "openservicemesh.io/sidecar-injection") - } - - fmt.Println(ns.ObjectMeta) - - _, err = kclient.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{}) - if err != nil { - return ErrLoadNamespace(err, namespace) - } - return nil } // createNS handles the creatin as well as deletion of namespaces -func createNS(h *Handler, ns string, del bool) error { +func createNS(h *Handler, ns string, del bool, kubeconfigs []string) error { manifest := fmt.Sprintf(` apiVersion: v1 kind: Namespace @@ -113,28 +125,37 @@ metadata: ns, ) - if err := h.applyManifest([]byte(manifest), del, noneNamespace); err != nil { + if err := h.applyManifest([]byte(manifest), del, noneNamespace, kubeconfigs); err != nil { return err } return nil } -func (h *Handler) applyManifest(contents []byte, isDel bool, namespace string) error { - kclient := h.MesheryKubeclient - if kclient == nil { - return ErrNilClient +func (h *Handler) applyManifest(contents []byte, isDel bool, namespace string, kubeconfigs []string) error { + var wg sync.WaitGroup + var errs []error + for _, k8sconfig := range kubeconfigs { + wg.Add(1) + go func(k8sconfig string) { + kClient, err := mesherykube.New([]byte(k8sconfig)) + if err != nil { + errs = append(errs, err) + return + } + err = kClient.ApplyManifest(contents, mesherykube.ApplyOptions{ + Namespace: namespace, + Update: true, + Delete: isDel, + }) + if err != nil { + errs = append(errs, err) + return + } + }(k8sconfig) + } + if len(errs) != 0 { + return ErrLoadNamespace(mergeErrors(errs), namespace) } - - err := kclient.ApplyManifest(contents, mesherykube.ApplyOptions{ - Namespace: namespace, - Update: true, - Delete: isDel, - }) - - if err != nil { - return err - } - return nil } From 60eb74ab1167a9b7f81023070e00e2c81755bac7 Mon Sep 17 00:00:00 2001 From: ashish Date: Tue, 31 May 2022 21:36:58 +0530 Subject: [PATCH 2/6] update meshery adapter library Signed-off-by: ashish --- go.mod | 3 +-- go.sum | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 063fb68d..fe0c1bca 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,10 @@ replace ( github.com/kudobuilder/kuttl => github.com/layer5io/kuttl v0.4.1-0.20200806180306-b7e46afd657f go.opentelemetry.io/otel v1.3.0 => go.opentelemetry.io/otel v0.11.0 go.opentelemetry.io/otel/sdk v1.3.0 => go.opentelemetry.io/otel/sdk v0.11.0 - github.com/layer5io/meshery-adapter-library => ../meshery-adapter-library ) require ( - github.com/layer5io/meshery-adapter-library v0.5.4 + github.com/layer5io/meshery-adapter-library v0.5.5 github.com/layer5io/meshkit v0.5.17 github.com/layer5io/service-mesh-performance v0.3.4 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index d80f1027..6b81508c 100644 --- a/go.sum +++ b/go.sum @@ -876,6 +876,8 @@ github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3 github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= github.com/layer5io/meshery-adapter-library v0.5.4 h1:QQ+nVGHd7KhV58KhY40V00kC+IEM4+AlOhQcSHSbOUE= github.com/layer5io/meshery-adapter-library v0.5.4/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= +github.com/layer5io/meshery-adapter-library v0.5.5 h1:4dGsHBDCLnkOOA/RaUM5n4bPdEdySREnkd3raU9LSDI= +github.com/layer5io/meshery-adapter-library v0.5.5/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= github.com/layer5io/meshkit v0.5.16/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU= github.com/layer5io/meshkit v0.5.17 h1:QnNuIj5CVLfaZyznEAMrwt51QImpVpQ8BTcOZOIOWhI= github.com/layer5io/meshkit v0.5.17/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU= From 0f9f7a75b5f3e81dfd01179afb335c052a64862d Mon Sep 17 00:00:00 2001 From: ashish Date: Sun, 5 Jun 2022 22:19:34 +0530 Subject: [PATCH 3/6] Added error mutex Signed-off-by: ashish --- osm/install.go | 5 +++++ osm/sample_apps.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/osm/install.go b/osm/install.go index 9af7ca97..47cd559b 100644 --- a/osm/install.go +++ b/osm/install.go @@ -63,6 +63,7 @@ func (h *Handler) applyHelmChart(del bool, version, namespace string, kubeconfig act = mesherykube.INSTALL } var wg sync.WaitGroup + var errMx sync.Mutex var errs []error for _, kubeconfig := range kubeconfigs { wg.Add(1) @@ -70,7 +71,9 @@ func (h *Handler) applyHelmChart(del bool, version, namespace string, kubeconfig defer wg.Done() kClient, err := mesherykube.New([]byte(kubeconfig)) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } err = kClient.ApplyHelmChart(mesherykube.ApplyHelmChartConfig{ @@ -84,7 +87,9 @@ func (h *Handler) applyHelmChart(del bool, version, namespace string, kubeconfig CreateNamespace: true, }) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } }(kubeconfig) diff --git a/osm/sample_apps.go b/osm/sample_apps.go index ce39132c..7e3a1659 100644 --- a/osm/sample_apps.go +++ b/osm/sample_apps.go @@ -65,18 +65,24 @@ func (h *Handler) installSampleApp(del bool, namespace string, templates []adapt func (h *Handler) sidecarInjection(namespace string, del bool, kubeconfigs []string) error { var wg sync.WaitGroup var errs []error + var errMx sync.Mutex for _, k8sconfig := range kubeconfigs { wg.Add(1) go func(k8sconfig string) { + defer wg.Done() kClient, err := mesherykube.New([]byte(k8sconfig)) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } // updating the label on the namespace ns, err := kClient.KubeClient.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } @@ -103,11 +109,14 @@ func (h *Handler) sidecarInjection(namespace string, del bool, kubeconfigs []str _, err = kClient.KubeClient.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{}) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } }(k8sconfig) } + wg.Wait() if len(errs) != 0 { return ErrLoadNamespace(mergeErrors(errs), namespace) } @@ -135,12 +144,16 @@ metadata: func (h *Handler) applyManifest(contents []byte, isDel bool, namespace string, kubeconfigs []string) error { var wg sync.WaitGroup var errs []error + var errMx sync.Mutex for _, k8sconfig := range kubeconfigs { wg.Add(1) go func(k8sconfig string) { + defer wg.Done() kClient, err := mesherykube.New([]byte(k8sconfig)) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } err = kClient.ApplyManifest(contents, mesherykube.ApplyOptions{ @@ -149,11 +162,14 @@ func (h *Handler) applyManifest(contents []byte, isDel bool, namespace string, k Delete: isDel, }) if err != nil { + errMx.Lock() errs = append(errs, err) + errMx.Unlock() return } }(k8sconfig) } + wg.Wait() if len(errs) != 0 { return ErrLoadNamespace(mergeErrors(errs), namespace) } From fb0f7f876e644dbd1380248a965f17a3947d1a7b Mon Sep 17 00:00:00 2001 From: ashish Date: Sat, 11 Jun 2022 17:14:01 +0530 Subject: [PATCH 4/6] go mod tidy Signed-off-by: ashish --- go.mod | 2 +- go.sum | 2 ++ osm/osm.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 8b3e700b..bbcfbb0b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ replace ( ) require ( - github.com/layer5io/meshery-adapter-library v0.5.5 + github.com/layer5io/meshery-adapter-library v0.5.6 github.com/layer5io/meshkit v0.5.20 github.com/layer5io/service-mesh-performance v0.3.4 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index d7284542..ea99f1dd 100644 --- a/go.sum +++ b/go.sum @@ -887,6 +887,8 @@ github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3 github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= github.com/layer5io/meshery-adapter-library v0.5.5 h1:4dGsHBDCLnkOOA/RaUM5n4bPdEdySREnkd3raU9LSDI= github.com/layer5io/meshery-adapter-library v0.5.5/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= +github.com/layer5io/meshery-adapter-library v0.5.6 h1:pbZTMkWNcGWPk314K7WhO4UGVxSnKvGLmwQXBWZ05GI= +github.com/layer5io/meshery-adapter-library v0.5.6/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= github.com/layer5io/meshkit v0.5.16/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU= github.com/layer5io/meshkit v0.5.20 h1:QpN/SEepUZk+Jj2K4TBRZJCRr/pzuvHqDaUr30vWddI= github.com/layer5io/meshkit v0.5.20/go.mod h1:EUfXIcztap9Dh0Ao3Dmoxf3FMsm4h7zFHGwagj+5ra4= diff --git a/osm/osm.go b/osm/osm.go index 425159ce..aa52c9d5 100644 --- a/osm/osm.go +++ b/osm/osm.go @@ -22,6 +22,8 @@ import ( meshkitCfg "github.com/layer5io/meshkit/config" "github.com/layer5io/meshkit/logger" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" + "github.com/layer5io/meskit/models" + "gopkg.in/yaml.v2" ) // Handler instance for this adapter @@ -39,6 +41,51 @@ func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) a } } +//CreateKubeconfigs creates and writes passed kubeconfig onto the filesystem +func (osm *Handler) CreateKubeconfigs(kubeconfigs []string) error { + var errs = make([]error, 0) + for _, kubeconfig := range kubeconfigs { + kconfig := models.Kubeconfig{} + err := yaml.Unmarshal([]byte(kubeconfig), &kconfig) + if err != nil { + errs = append(errs, err) + continue + } + + // To have control over what exactly to take in on kubeconfig + osm.KubeconfigHandler.SetKey("kind", kconfig.Kind) + osm.KubeconfigHandler.SetKey("apiVersion", kconfig.APIVersion) + osm.KubeconfigHandler.SetKey("current-context", kconfig.CurrentContext) + err = osm.KubeconfigHandler.SetObject("preferences", kconfig.Preferences) + if err != nil { + errs = append(errs, err) + continue + } + + err = osm.KubeconfigHandler.SetObject("clusters", kconfig.Clusters) + if err != nil { + errs = append(errs, err) + continue + } + + err = osm.KubeconfigHandler.SetObject("users", kconfig.Users) + if err != nil { + errs = append(errs, err) + continue + } + + err = osm.KubeconfigHandler.SetObject("contexts", kconfig.Contexts) + if err != nil { + errs = append(errs, err) + continue + } + } + if len(errs) == 0 { + return nil + } + return mergeErrors(errs) +} + // ProcessOAM will handles the grpc invocation for handling OAM objects func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { kubeconfigs := oamReq.K8sConfigs From 883acf1ab5d2572b81b38c944d226c771a7eba96 Mon Sep 17 00:00:00 2001 From: ashish Date: Sat, 11 Jun 2022 17:15:59 +0530 Subject: [PATCH 5/6] final changes Signed-off-by: ashish --- osm/operations.go | 6 +++++- osm/osm.go | 11 ++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/osm/operations.go b/osm/operations.go index 541dbefc..a7db0041 100644 --- a/osm/operations.go +++ b/osm/operations.go @@ -12,10 +12,14 @@ import ( // ApplyOperation function contains the operation handlers func (h *Handler) ApplyOperation(ctx context.Context, request adapter.OperationRequest, hchan *chan interface{}) error { + err := h.CreateKubeconfigs(request.K8sConfigs) + if err != nil { + return err + } h.SetChannel(hchan) kubeconfigs := request.K8sConfigs operations := make(adapter.Operations) - err := h.Config.GetObject(adapter.OperationsKey, &operations) + err = h.Config.GetObject(adapter.OperationsKey, &operations) if err != nil { return err } diff --git a/osm/osm.go b/osm/osm.go index aa52c9d5..d7d748bf 100644 --- a/osm/osm.go +++ b/osm/osm.go @@ -21,8 +21,8 @@ import ( "github.com/layer5io/meshery-osm/osm/oam" meshkitCfg "github.com/layer5io/meshkit/config" "github.com/layer5io/meshkit/logger" + "github.com/layer5io/meshkit/models" "github.com/layer5io/meshkit/models/oam/core/v1alpha1" - "github.com/layer5io/meskit/models" "gopkg.in/yaml.v2" ) @@ -35,8 +35,9 @@ type Handler struct { func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) adapter.Handler { return &Handler{ Adapter: adapter.Adapter{ - Config: config, - Log: log, + Config: config, + Log: log, + KubeconfigHandler: kc, }, } } @@ -88,6 +89,10 @@ func (osm *Handler) CreateKubeconfigs(kubeconfigs []string) error { // ProcessOAM will handles the grpc invocation for handling OAM objects func (h *Handler) ProcessOAM(ctx context.Context, oamReq adapter.OAMRequest, hchan *chan interface{}) (string, error) { + err := h.CreateKubeconfigs(oamReq.K8sConfigs) + if err != nil { + return "", err + } kubeconfigs := oamReq.K8sConfigs h.SetChannel(hchan) var comps []v1alpha1.Component From d5cf7be26ccba584f99577d5b176d42f775dd2cb Mon Sep 17 00:00:00 2001 From: ashish Date: Sat, 11 Jun 2022 17:40:11 +0530 Subject: [PATCH 6/6] static fix Signed-off-by: ashish --- go.sum | 2 -- osm/osm.go | 16 ++++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/go.sum b/go.sum index ea99f1dd..6f72a865 100644 --- a/go.sum +++ b/go.sum @@ -885,8 +885,6 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/layer5io/kuttl v0.4.1-0.20200806180306-b7e46afd657f/go.mod h1:UmrVd7x+bNVKrpmKgTtfRiTKHZeNPcMjQproJ0vGwhE= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34 h1:QaViadDOBCMDUwYx78kfRvHMkzRVnh/GOhm3s2gxoP4= github.com/layer5io/learn-layer5/smi-conformance v0.0.0-20210317075357-06b4f88b3e34/go.mod h1:BQPLwdJt7v7y0fXIejI4whR9zMyX07Wjt5xrbgEmHLw= -github.com/layer5io/meshery-adapter-library v0.5.5 h1:4dGsHBDCLnkOOA/RaUM5n4bPdEdySREnkd3raU9LSDI= -github.com/layer5io/meshery-adapter-library v0.5.5/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= github.com/layer5io/meshery-adapter-library v0.5.6 h1:pbZTMkWNcGWPk314K7WhO4UGVxSnKvGLmwQXBWZ05GI= github.com/layer5io/meshery-adapter-library v0.5.6/go.mod h1:YmLV0w6ucBagrqUB0x9q8ZVXrhN1tJBP5j+Pu6LOY/M= github.com/layer5io/meshkit v0.5.16/go.mod h1:tj5TAjty7T/WJ8YvlDfOZF94t4g3mhWuKBCc6MOUoNU= diff --git a/osm/osm.go b/osm/osm.go index d7d748bf..db8ddd22 100644 --- a/osm/osm.go +++ b/osm/osm.go @@ -43,7 +43,7 @@ func New(config meshkitCfg.Handler, log logger.Handler, kc meshkitCfg.Handler) a } //CreateKubeconfigs creates and writes passed kubeconfig onto the filesystem -func (osm *Handler) CreateKubeconfigs(kubeconfigs []string) error { +func (h *Handler) CreateKubeconfigs(kubeconfigs []string) error { var errs = make([]error, 0) for _, kubeconfig := range kubeconfigs { kconfig := models.Kubeconfig{} @@ -54,28 +54,28 @@ func (osm *Handler) CreateKubeconfigs(kubeconfigs []string) error { } // To have control over what exactly to take in on kubeconfig - osm.KubeconfigHandler.SetKey("kind", kconfig.Kind) - osm.KubeconfigHandler.SetKey("apiVersion", kconfig.APIVersion) - osm.KubeconfigHandler.SetKey("current-context", kconfig.CurrentContext) - err = osm.KubeconfigHandler.SetObject("preferences", kconfig.Preferences) + h.KubeconfigHandler.SetKey("kind", kconfig.Kind) + h.KubeconfigHandler.SetKey("apiVersion", kconfig.APIVersion) + h.KubeconfigHandler.SetKey("current-context", kconfig.CurrentContext) + err = h.KubeconfigHandler.SetObject("preferences", kconfig.Preferences) if err != nil { errs = append(errs, err) continue } - err = osm.KubeconfigHandler.SetObject("clusters", kconfig.Clusters) + err = h.KubeconfigHandler.SetObject("clusters", kconfig.Clusters) if err != nil { errs = append(errs, err) continue } - err = osm.KubeconfigHandler.SetObject("users", kconfig.Users) + err = h.KubeconfigHandler.SetObject("users", kconfig.Users) if err != nil { errs = append(errs, err) continue } - err = osm.KubeconfigHandler.SetObject("contexts", kconfig.Contexts) + err = h.KubeconfigHandler.SetObject("contexts", kconfig.Contexts) if err != nil { errs = append(errs, err) continue