From da1d38325cfe832c321faa8ede2dc695795278b0 Mon Sep 17 00:00:00 2001 From: Delyan Raychev Date: Wed, 23 Oct 2019 23:23:46 -0700 Subject: [PATCH] Simplify Updating ingress IP address; Do only if new IP (#625) --- pkg/azure/client.go | 51 +++++++++--- pkg/controller/mutate_aks.go | 79 +++++++++++-------- ...app_gateway_test.go => mutate_aks_test.go} | 36 +++------ pkg/controller/mutate_app_gateway.go | 43 +++++----- pkg/controller/prune_test.go | 4 +- pkg/k8scontext/context.go | 14 +++- pkg/worker/fake.go | 16 ++-- pkg/worker/types.go | 4 +- pkg/worker/worker.go | 16 ++-- pkg/worker/worker_test.go | 9 ++- 10 files changed, 158 insertions(+), 114 deletions(-) rename pkg/controller/{mutate_app_gateway_test.go => mutate_aks_test.go} (78%) diff --git a/pkg/azure/client.go b/pkg/azure/client.go index a128c43f4..fe4431101 100644 --- a/pkg/azure/client.go +++ b/pkg/azure/client.go @@ -41,6 +41,7 @@ type azClient struct { subscriptionID SubscriptionID resourceGroupName ResourceGroup appGwName ResourceName + memoizedIPs map[string]n.PublicIPAddress ctx context.Context } @@ -55,30 +56,44 @@ func NewAzClient(subscriptionID SubscriptionID, resourceGroupName ResourceGroup, subnetsClient: n.NewSubnetsClient(string(subscriptionID)), groupsClient: r.NewGroupsClient(string(subscriptionID)), deploymentsClient: r.NewDeploymentsClient(string(subscriptionID)), - subscriptionID: subscriptionID, - resourceGroupName: resourceGroupName, - appGwName: appGwName, + + subscriptionID: subscriptionID, + resourceGroupName: resourceGroupName, + appGwName: appGwName, + memoizedIPs: make(map[string]n.PublicIPAddress), ctx: context.Background(), authorizer: authorizer, } - az.appGatewaysClient.AddToUserAgent(userAgent) + if err := az.appGatewaysClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to App Gateway client: ", userAgent) + } az.appGatewaysClient.Authorizer = az.authorizer - az.publicIPsClient.AddToUserAgent(userAgent) + if err := az.publicIPsClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to Public IP client: ", userAgent) + } az.publicIPsClient.Authorizer = az.authorizer - az.virtualNetworksClient.AddToUserAgent(userAgent) + + if err := az.virtualNetworksClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to Virtual Networks client: ", userAgent) + } az.virtualNetworksClient.Authorizer = az.authorizer - az.subnetsClient.AddToUserAgent(userAgent) + + if err := az.subnetsClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to Subnets client: ", userAgent) + } az.subnetsClient.Authorizer = az.authorizer - az.groupsClient.AddToUserAgent(userAgent) - az.groupsClient.Authorizer = az.authorizer - az.publicIPsClient.AddToUserAgent(userAgent) - az.publicIPsClient.Authorizer = az.authorizer + if err := az.groupsClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to Groups client: ", userAgent) + } + az.groupsClient.Authorizer = az.authorizer - az.deploymentsClient.AddToUserAgent(userAgent) + if err := az.deploymentsClient.AddToUserAgent(userAgent); err != nil { + glog.Error("Error adding User Agent to Deployments client: ", userAgent) + } az.deploymentsClient.Authorizer = az.authorizer return az @@ -100,8 +115,18 @@ func (az *azClient) UpdateGateway(appGwObj *n.ApplicationGateway) (err error) { } func (az *azClient) GetPublicIP(resourceID string) (n.PublicIPAddress, error) { + if ip, ok := az.memoizedIPs[resourceID]; ok { + return ip, nil + } + _, resourceGroupName, publicIPName := ParseResourceID(resourceID) - return az.publicIPsClient.Get(az.ctx, string(resourceGroupName), string(publicIPName), "") + + ip, err := az.publicIPsClient.Get(az.ctx, string(resourceGroupName), string(publicIPName), "") + if err != nil { + return n.PublicIPAddress{}, err + } + az.memoizedIPs[resourceID] = ip + return ip, nil } // DeployGateway is a method that deploy the appgw and related resources diff --git a/pkg/controller/mutate_aks.go b/pkg/controller/mutate_aks.go index 36c82facf..350c16d1b 100644 --- a/pkg/controller/mutate_aks.go +++ b/pkg/controller/mutate_aks.go @@ -13,64 +13,81 @@ import ( "github.com/Azure/application-gateway-kubernetes-ingress/pkg/annotations" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw" + "github.com/Azure/application-gateway-kubernetes-ingress/pkg/azure" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/events" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext" ) +type ipResource string +type ipAddress string + // MutateAKS applies changes to Kubernetes resources. -func (c AppGwIngressController) MutateAKS(event events.Event) error { +func (c AppGwIngressController) MutateAKS() error { + appGw, cbCtx, err := c.getAppGw() + if err != nil { + return err + } + + ips := getIPsFromAppGateway(appGw, c.azClient) + + // update all relevant ingresses with IP address obtained from existing App Gateway configuration + cbCtx.IngressList = c.PruneIngress(appGw, cbCtx) + for _, ingress := range cbCtx.IngressList { + c.updateIngressStatus(appGw, cbCtx, ingress, ips) + } return nil } -func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, event events.Event) { - ingress, ok := event.Value.(*v1beta1.Ingress) - if !ok { - return - } +func (c AppGwIngressController) updateIngressStatus(appGw *n.ApplicationGateway, cbCtx *appgw.ConfigBuilderContext, ingress *v1beta1.Ingress, ips map[ipResource]ipAddress) { - // check if this ingress is for AGIC or not, it might have been updated - if !k8scontext.IsIngressApplicationGateway(ingress) || !cbCtx.InIngressList(ingress) { - if err := c.k8sContext.UpdateIngressStatus(*ingress, ""); err != nil { - c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) - } + // determine what ipAddress to attach + usePrivateIP, _ := annotations.UsePrivateIP(ingress) + usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true" + + ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP) + if ipConf == nil { + glog.V(9).Info("[mutate_aks] No IP config for App Gwy: ", appGw.Name) return } - // determine what ip to attach - usePrivateIP, _ := annotations.UsePrivateIP(ingress) - usePrivateIP = usePrivateIP || cbCtx.EnvVariables.UsePrivateIP == "true" - if ipConf := appgw.LookupIPConfigurationByType(appGw.FrontendIPConfigurations, usePrivateIP); ipConf != nil { - if ipAddress, ok := c.ipAddressMap[*ipConf.ID]; ok { - if err := c.k8sContext.UpdateIngressStatus(*ingress, ipAddress); err != nil { - c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) - } + glog.V(5).Infof("[mutate_aks] Resolving IP for ID (%s)", *ipConf.ID) + if newIP, found := ips[ipResource(*ipConf.ID)]; found { + if err := c.k8sContext.UpdateIngressStatus(*ingress, k8scontext.IPAddress(newIP)); err != nil { + c.recorder.Event(ingress, v1.EventTypeWarning, events.ReasonUnableToUpdateIngressStatus, err.Error()) + glog.Errorf("[mutate_aks] Error updating ingress %s/%s IP to %+v", ingress.Namespace, ingress.Name, newIP) + return } + glog.V(5).Infof("[mutate_aks] Updated Ingress %s/%s IP to %+v", ingress.Namespace, ingress.Name, newIP) } } -func (c AppGwIngressController) updateIPAddressMap(appGw *n.ApplicationGateway) { +func getIPsFromAppGateway(appGw *n.ApplicationGateway, azClient azure.AzClient) map[ipResource]ipAddress { + ips := make(map[ipResource]ipAddress) for _, ipConf := range *appGw.FrontendIPConfigurations { - if _, ok := c.ipAddressMap[*ipConf.ID]; ok { - return + ipID := ipResource(*ipConf.ID) + if _, ok := ips[ipID]; ok { + continue } if ipConf.PrivateIPAddress != nil { - c.ipAddressMap[*ipConf.ID] = k8scontext.IPAddress(*ipConf.PrivateIPAddress) - } else if ipAddress := c.getPublicIPAddress(*ipConf.PublicIPAddress.ID); ipAddress != nil { - c.ipAddressMap[*ipConf.ID] = *ipAddress + ips[ipID] = ipAddress(*ipConf.PrivateIPAddress) + } else if ipAddress := getPublicIPAddress(*ipConf.PublicIPAddress.ID, azClient); ipAddress != nil { + ips[ipID] = *ipAddress } } + glog.V(5).Infof("[mutate_aks] Found IPs: %+v", ips) + return ips } -// getPublicIPAddress gets the ip address associated to public ip on Azure -func (c AppGwIngressController) getPublicIPAddress(publicIPID string) *k8scontext.IPAddress { - // get public ip - publicIP, err := c.azClient.GetPublicIP(publicIPID) +// getPublicIPAddress gets the ipAddress address associated to public ipAddress on Azure +func getPublicIPAddress(publicIPID string, azClient azure.AzClient) *ipAddress { + // get public ipAddress + publicIP, err := azClient.GetPublicIP(publicIPID) if err != nil { - glog.Errorf("Unable to get Public IP Address %s. Error %s", publicIPID, err) + glog.Errorf("[mutate_aks] Unable to get Public IP Address %s. Error %s", publicIPID, err) return nil } - ipAddress := k8scontext.IPAddress(*publicIP.IPAddress) + ipAddress := ipAddress(*publicIP.IPAddress) return &ipAddress } diff --git a/pkg/controller/mutate_app_gateway_test.go b/pkg/controller/mutate_aks_test.go similarity index 78% rename from pkg/controller/mutate_app_gateway_test.go rename to pkg/controller/mutate_aks_test.go index 07b74f29d..4ea812e7d 100644 --- a/pkg/controller/mutate_app_gateway_test.go +++ b/pkg/controller/mutate_aks_test.go @@ -22,7 +22,6 @@ import ( "github.com/Azure/application-gateway-kubernetes-ingress/pkg/appgw" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/agic_crd_client/clientset/versioned/fake" istio_fake "github.com/Azure/application-gateway-kubernetes-ingress/pkg/crd_client/istio_crd_client/clientset/versioned/fake" - "github.com/Azure/application-gateway-kubernetes-ingress/pkg/events" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/k8scontext" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/metricstore" "github.com/Azure/application-gateway-kubernetes-ingress/pkg/tests" @@ -43,7 +42,9 @@ var _ = Describe("process function tests", func() { }, } publicIP := k8scontext.IPAddress("xxxx") - privateIP := k8scontext.IPAddress("xxxx") + privateIP := k8scontext.IPAddress("yyyy") + var ips map[ipResource]ipAddress + BeforeEach(func() { stopChannel = make(chan struct{}) @@ -82,16 +83,17 @@ var _ = Describe("process function tests", func() { DefaultAddressPoolID: to.StringPtr("xx"), DefaultHTTPSettingsID: to.StringPtr("yy"), } + + ips = map[ipResource]ipAddress{"PublicIP": "xxxx", "PrivateIP": "yyyy"} }) + AfterEach(func() { close(stopChannel) }) + Context("test updateIngressStatus", func() { - It("ensure that updateIngressStatus adds ip to ingress", func() { - controller.updateIngressStatus(&appGw, cbCtx, events.Event{ - Type: events.Update, - Value: ingress, - }) + It("ensure that updateIngressStatus adds ipAddress to ingress", func() { + controller.updateIngressStatus(&appGw, cbCtx, ingress, ips) updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{}) Expect(updatedIngress.Status.LoadBalancer.Ingress).Should(ContainElement(v1.LoadBalancerIngress{ Hostname: "", @@ -100,28 +102,12 @@ var _ = Describe("process function tests", func() { Expect(len(updatedIngress.Status.LoadBalancer.Ingress)).To(Equal(1)) }) - It("ensure that updateIngressStatus removes ip to ingress not for AGIC", func() { - ingress.Annotations[annotations.IngressClassKey] = "otheric" - updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) - controller.k8sContext.UpdateIngressStatus(*ingress, k8scontext.IPAddress(publicIP)) - controller.updateIngressStatus(&appGw, cbCtx, events.Event{ - Type: events.Update, - Value: ingress, - }) - updatedIngress, _ = k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{}) - Expect(annotations.IsApplicationGatewayIngress(updatedIngress)).To(BeFalse()) - Expect(len(updatedIngress.Status.LoadBalancer.Ingress)).To(Equal(0)) - }) - - It("ensure that updateIngressStatus adds private ip when annotation is present", func() { + It("ensure that updateIngressStatus adds private ipAddress when annotation is present", func() { ingress.Annotations[annotations.UsePrivateIPKey] = "true" updatedIngress, _ := k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) Expect(annotations.UsePrivateIP(updatedIngress)).To(BeTrue()) - controller.updateIngressStatus(&appGw, cbCtx, events.Event{ - Type: events.Update, - Value: ingress, - }) + controller.updateIngressStatus(&appGw, cbCtx, ingress, ips) updatedIngress, _ = k8sClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Get(ingress.Name, metav1.GetOptions{}) Expect(updatedIngress.Status.LoadBalancer.Ingress).Should(ContainElement(v1.LoadBalancerIngress{ diff --git a/pkg/controller/mutate_app_gateway.go b/pkg/controller/mutate_app_gateway.go index f7f2ef575..1f642b9ec 100644 --- a/pkg/controller/mutate_app_gateway.go +++ b/pkg/controller/mutate_app_gateway.go @@ -25,8 +25,7 @@ type realClock struct{} func (realClock) Now() time.Time { return time.Now() } -// MutateAppGateway applies App Gateway config. -func (c AppGwIngressController) MutateAppGateway(event events.Event) error { +func (c AppGwIngressController) getAppGw() (*n.ApplicationGateway, *appgw.ConfigBuilderContext, error) { // Get current application gateway config appGw, err := c.azClient.GetGateway() c.metricStore.IncArmAPICallCounter() @@ -36,14 +35,9 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { if c.agicPod != nil { c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonUnableToFetchAppGw, errorLine) } - return ErrFetchingAppGatewayConfig + return nil, nil, ErrFetchingAppGatewayConfig } - c.updateIPAddressMap(&appGw) - - existingConfigJSON, _ := dumpSanitizedJSON(&appGw, false, to.StringPtr("-- Existing App Gwy Config --")) - glog.V(5).Info("Existing App Gateway config: ", string(existingConfigJSON)) - cbCtx := &appgw.ConfigBuilderContext{ ServiceList: c.k8sContext.ListServices(), IngressList: c.k8sContext.ListHTTPIngresses(), @@ -53,6 +47,19 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { DefaultHTTPSettingsID: to.StringPtr(c.appGwIdentifier.HTTPSettingsID(appgw.DefaultBackendHTTPSettingsName)), } + return &appGw, cbCtx, nil +} + +// MutateAppGateway applies App Gateway config. +func (c AppGwIngressController) MutateAppGateway() error { + appGw, cbCtx, err := c.getAppGw() + if err != nil { + return err + } + + existingConfigJSON, _ := dumpSanitizedJSON(appGw, false, to.StringPtr("-- Existing App Gwy Config --")) + glog.V(5).Info("Existing App Gateway config: ", string(existingConfigJSON)) + if cbCtx.EnvVariables.EnableBrownfieldDeployment { prohibitedTargets := c.k8sContext.ListAzureProhibitedTargets() if len(prohibitedTargets) > 0 { @@ -81,7 +88,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { } } - cbCtx.IngressList = c.PruneIngress(&appGw, cbCtx) + cbCtx.IngressList = c.PruneIngress(appGw, cbCtx) if cbCtx.EnvVariables.EnableIstioIntegration { var gatewaysInfo []string @@ -102,7 +109,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { } // Create a configbuilder based on current appgw config - configBuilder := appgw.NewConfigBuilder(c.k8sContext, &c.appGwIdentifier, &appGw, c.recorder, realClock{}) + configBuilder := appgw.NewConfigBuilder(c.k8sContext, &c.appGwIdentifier, appGw, c.recorder, realClock{}) // Run validations on the Kubernetes resources which can suggest misconfiguration. if err = configBuilder.PreBuildValidate(cbCtx); err != nil { @@ -133,10 +140,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { } } - if c.configIsSame(&appGw) { - // update ingresses with appgw gateway ip address - c.updateIngressStatus(generatedAppGw, cbCtx, event) - + if c.configIsSame(appGw) { glog.V(3).Info("cache: Config has NOT changed! No need to connect to ARM.") return nil } @@ -150,12 +154,12 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { if err != nil { // Reset cache c.configCache = nil - configJSON, _ := dumpSanitizedJSON(&appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil) + configJSON, _ := dumpSanitizedJSON(appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil) glogIt := glog.Errorf if cbCtx.EnvVariables.EnablePanicOnPutError { glogIt = glog.Fatalf } - errorLine := fmt.Sprintf("Failed applying App Gwy configuration: %s -- %s", err, string(configJSON)) + errorLine := fmt.Sprintf("Failed applying App Gwy configuration:\n%s\n\nerror: %s", string(configJSON), err) glogIt(errorLine) if c.agicPod != nil { c.recorder.Event(c.agicPod, v1.EventTypeWarning, events.ReasonFailedApplyingAppGwConfig, errorLine) @@ -164,7 +168,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { return err } // Wait until deployment finshes and save the error message - configJSON, _ := dumpSanitizedJSON(&appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil) + configJSON, _ := dumpSanitizedJSON(appGw, cbCtx.EnvVariables.EnableSaveConfigToFile, nil) glog.V(5).Info(string(configJSON)) // We keep this at log level 1 to show some heartbeat in the logs. Without this it is way too quiet. @@ -186,10 +190,7 @@ func (c AppGwIngressController) MutateAppGateway(event events.Event) error { } glog.V(3).Info("cache: Updated with latest applied config.") - c.updateCache(&appGw) - - // update ingresses with appgw gateway ip address - c.updateIngressStatus(generatedAppGw, cbCtx, event) + c.updateCache(appGw) c.metricStore.IncArmAPIUpdateCallSuccessCounter() diff --git a/pkg/controller/prune_test.go b/pkg/controller/prune_test.go index f009431f1..700f6ad1e 100644 --- a/pkg/controller/prune_test.go +++ b/pkg/controller/prune_test.go @@ -53,14 +53,14 @@ var _ = Describe("prune function tests", func() { } appGw := fixtures.GetAppGateway() - It("removes the ingress using private ip and keeps others", func() { + It("removes the ingress using private ipAddress and keeps others", func() { Expect(len(cbCtx.IngressList)).To(Equal(2)) prunedIngresses := pruneNoPrivateIP(controller, &appGw, cbCtx, cbCtx.IngressList) Expect(len(prunedIngresses)).To(Equal(1)) Expect(prunedIngresses[0].Annotations[annotations.UsePrivateIPKey]).To(Equal("")) }) - It("keeps the ingress using private ip when public ip is present", func() { + It("keeps the ingress using private ipAddress when public ipAddress is present", func() { appGw.FrontendIPConfigurations = &[]n.ApplicationGatewayFrontendIPConfiguration{ fixtures.GetPublicIPConfiguration(), fixtures.GetPrivateIPConfiguration(), diff --git a/pkg/k8scontext/context.go b/pkg/k8scontext/context.go index 75d32d8e4..7f00bc9e9 100644 --- a/pkg/k8scontext/context.go +++ b/pkg/k8scontext/context.go @@ -408,17 +408,25 @@ func (c *Context) GetInfrastructureResourceGroupID() (azure.SubscriptionID, azur } // UpdateIngressStatus adds IP address in Ingress Status -func (c *Context) UpdateIngressStatus(ingressToUpdate v1beta1.Ingress, address IPAddress) error { +func (c *Context) UpdateIngressStatus(ingressToUpdate v1beta1.Ingress, newIP IPAddress) error { ingressClient := c.kubeClient.ExtensionsV1beta1().Ingresses(ingressToUpdate.Namespace) ingress, err := ingressClient.Get(ingressToUpdate.Name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("Unable to get ingress %s/%s", ingressToUpdate.Namespace, ingressToUpdate.Name) } + for _, lbi := range ingress.Status.LoadBalancer.Ingress { + existingIP := lbi.IP + if existingIP == string(newIP) { + glog.V(5).Infof("IP %s already set on Ingress %s/%s", lbi.IP, ingress.Namespace, ingress.Name) + return nil + } + } + loadBalancerIngresses := []v1.LoadBalancerIngress{} - if address != "" { + if newIP != "" { loadBalancerIngresses = append(loadBalancerIngresses, v1.LoadBalancerIngress{ - IP: string(address), + IP: string(newIP), }) } ingress.Status.LoadBalancer.Ingress = loadBalancerIngresses diff --git a/pkg/worker/fake.go b/pkg/worker/fake.go index fe45bce73..21d5e942c 100644 --- a/pkg/worker/fake.go +++ b/pkg/worker/fake.go @@ -11,17 +11,18 @@ import ( // FakeProcessor is fake event processor type type FakeProcessor struct { - processFunc func(events.Event) error + mutateAppGwy func() error + mutateAKS func() error } // MutateAppGateway will call the callback provided -func (fp FakeProcessor) MutateAppGateway(event events.Event) error { - return fp.processFunc(event) +func (fp FakeProcessor) MutateAppGateway() error { + return fp.mutateAppGwy() } // MutateAKS will call the callback provided -func (fp FakeProcessor) MutateAKS(event events.Event) error { - return fp.processFunc(event) +func (fp FakeProcessor) MutateAKS() error { + return fp.mutateAKS() } // ShouldProcess will return true @@ -30,8 +31,9 @@ func (fp FakeProcessor) ShouldProcess(event events.Event) (bool, *string) { } // NewFakeProcessor returns a fake processor struct. -func NewFakeProcessor(process func(events.Event) error) FakeProcessor { +func NewFakeProcessor(mutateAppGwy func() error, mutateAKS func() error) FakeProcessor { return FakeProcessor{ - processFunc: process, + mutateAppGwy: mutateAppGwy, + mutateAKS: mutateAKS, } } diff --git a/pkg/worker/types.go b/pkg/worker/types.go index 0315a5ca9..66a720c00 100644 --- a/pkg/worker/types.go +++ b/pkg/worker/types.go @@ -11,8 +11,8 @@ import ( // EventProcessor provides a mechanism to act on events in the internal queue. type EventProcessor interface { - MutateAppGateway(events.Event) error - MutateAKS(events.Event) error + MutateAppGateway() error + MutateAKS() error ShouldProcess(events.Event) (bool, *string) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index d9782aa5b..57210bee6 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -16,14 +16,14 @@ import ( const sleepOnErrorSeconds = 5 func drainChan(ch chan events.Event, defaultEvent events.Event) events.Event { - final := defaultEvent + lastEvent := defaultEvent glog.V(9).Infof("Draining %d events from work channel", len(ch)) for { select { case event := <-ch: - final = event + lastEvent = event default: - return final + return lastEvent } } } @@ -43,14 +43,14 @@ func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) { continue } - if err := w.MutateAKS(event); err != nil { + _ = drainChan(work, event) + if err := w.MutateAKS(); err != nil { + glog.Error("Error mutating AKS from k8s event. ", err) } - lastEvent := drainChan(work, event) - - if err := w.MutateAppGateway(lastEvent); err != nil { - glog.Error("Processing event failed:", err) + if err := w.MutateAppGateway(); err != nil { + glog.Error("Error mutating App Gateway config from k8s event. ", err) time.Sleep(sleepOnErrorSeconds * time.Second) } diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index a3b07b5ab..19fc849c7 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -31,10 +31,15 @@ var _ = Describe("Worker Test", func() { Context("Check that worker executes the process", func() { It("Should be able to run process func", func() { backChannel := make(chan struct{}) - eventProcessor := NewFakeProcessor(func(events.Event) error { + mutateAppGw := func() error { backChannel <- struct{}{} return nil - }) + } + mutateAKS := func() error { + backChannel <- struct{}{} + return nil + } + eventProcessor := NewFakeProcessor(mutateAppGw, mutateAKS) worker := Worker{ EventProcessor: eventProcessor, }